001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.coord; 020 021import com.google.common.collect.Lists; 022 023import org.apache.commons.lang.StringUtils; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.client.OozieClient; 027import org.apache.oozie.command.CommandException; 028import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil; 029import org.apache.oozie.dependency.URIHandler; 030import org.apache.oozie.dependency.URIHandler.Context; 031import org.apache.oozie.service.Services; 032import org.apache.oozie.service.URIHandlerService; 033import org.apache.oozie.util.DateUtils; 034import org.apache.oozie.util.ELEvaluator; 035import org.apache.oozie.util.ParamChecker; 036import org.apache.oozie.util.XLog; 037import org.jdom.JDOMException; 038 039import java.net.URI; 040import java.util.ArrayList; 041import java.util.Calendar; 042import java.util.Date; 043import java.util.GregorianCalendar; 044import java.util.List; 045import java.util.TimeZone; 046 047/** 048 * This class implements the EL function related to coordinator 049 */ 050 051public class CoordELFunctions { 052 final public static String DATASET = "oozie.coord.el.dataset.bean"; 053 final public static String COORD_ACTION = "oozie.coord.el.app.bean"; 054 final public static String CONFIGURATION = "oozie.coord.el.conf"; 055 final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time"; 056 // INSTANCE_SEPARATOR is used to separate multiple directories into one tag. 057 final public static String INSTANCE_SEPARATOR = "#"; 058 final public static String DIR_SEPARATOR = ","; 059 // TODO: in next release, support flexibility 060 private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS"; 061 062 public static final long MINUTE_MSEC = 60 * 1000L; 063 public static final long HOUR_MSEC = 60 * MINUTE_MSEC; 064 public static final long DAY_MSEC = 24 * HOUR_MSEC; 065 /** 066 * Used in defining the frequency in 'day' unit. <p> domain: <code> val > 0</code> and should be integer. 067 * 068 * @param val frequency in number of days. 069 * @return number of days and also set the frequency timeunit to "day" 070 */ 071 public static int ph1_coord_days(int val) { 072 val = ParamChecker.checkGTZero(val, "n"); 073 ELEvaluator eval = ELEvaluator.getCurrent(); 074 eval.setVariable("timeunit", TimeUnit.DAY); 075 eval.setVariable("endOfDuration", TimeUnit.NONE); 076 return val; 077 } 078 079 /** 080 * Used in defining the frequency in 'month' unit. <p> domain: <code> val > 0</code> and should be integer. 081 * 082 * @param val frequency in number of months. 083 * @return number of months and also set the frequency timeunit to "month" 084 */ 085 public static int ph1_coord_months(int val) { 086 val = ParamChecker.checkGTZero(val, "n"); 087 ELEvaluator eval = ELEvaluator.getCurrent(); 088 eval.setVariable("timeunit", TimeUnit.MONTH); 089 eval.setVariable("endOfDuration", TimeUnit.NONE); 090 return val; 091 } 092 093 /** 094 * Used in defining the frequency in 'hour' unit. <p> parameter value domain: <code> val > 0</code> and should 095 * be integer. 096 * 097 * @param val frequency in number of hours. 098 * @return number of minutes and also set the frequency timeunit to "minute" 099 */ 100 public static int ph1_coord_hours(int val) { 101 val = ParamChecker.checkGTZero(val, "n"); 102 ELEvaluator eval = ELEvaluator.getCurrent(); 103 eval.setVariable("timeunit", TimeUnit.MINUTE); 104 eval.setVariable("endOfDuration", TimeUnit.NONE); 105 return val * 60; 106 } 107 108 /** 109 * Used in defining the frequency in 'minute' unit. <p> domain: <code> val > 0</code> and should be integer. 110 * 111 * @param val frequency in number of minutes. 112 * @return number of minutes and also set the frequency timeunit to "minute" 113 */ 114 public static int ph1_coord_minutes(int val) { 115 val = ParamChecker.checkGTZero(val, "n"); 116 ELEvaluator eval = ELEvaluator.getCurrent(); 117 eval.setVariable("timeunit", TimeUnit.MINUTE); 118 eval.setVariable("endOfDuration", TimeUnit.NONE); 119 return val; 120 } 121 122 /** 123 * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p> Every instance will 124 * start at 00:00 hour of each day. <p> domain: <code> val > 0</code> and should be integer. 125 * 126 * @param val frequency in number of days. 127 * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day" 128 */ 129 public static int ph1_coord_endOfDays(int val) { 130 val = ParamChecker.checkGTZero(val, "n"); 131 ELEvaluator eval = ELEvaluator.getCurrent(); 132 eval.setVariable("timeunit", TimeUnit.DAY); 133 eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY); 134 return val; 135 } 136 137 /** 138 * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p> Every instance will 139 * start at first day of each month at 00:00 hour. <p> domain: <code> val > 0</code> and should be integer. 140 * 141 * @param val: frequency in number of months. 142 * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month" 143 */ 144 public static int ph1_coord_endOfMonths(int val) { 145 val = ParamChecker.checkGTZero(val, "n"); 146 ELEvaluator eval = ELEvaluator.getCurrent(); 147 eval.setVariable("timeunit", TimeUnit.MONTH); 148 eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH); 149 return val; 150 } 151 152 /** 153 * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p> Depends on: <p> 154 * 1. Timezone of both dataset and job <p> 2. Action creation Time 155 * 156 * @return difference in minutes (DataSet TZ Offset - Application TZ offset) 157 */ 158 public static int ph2_coord_tzOffset() { 159 long actionCreationTime = getActionCreationtime().getTime(); 160 TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ"); 161 TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ"); 162 return (dsTZ.getOffset(actionCreationTime) - jobTZ.getOffset(actionCreationTime)) / (1000 * 60); 163 } 164 165 public static int ph3_coord_tzOffset() { 166 return ph2_coord_tzOffset(); 167 } 168 169 /** 170 * Returns a date string that is offset from 'strBaseDate' by the amount specified. The unit can be one of 171 * DAY, MONTH, HOUR, MINUTE, MONTH. 172 * 173 * @param strBaseDate The base date 174 * @param offset any number 175 * @param unit one of DAY, MONTH, HOUR, MINUTE, MONTH 176 * @return the offset date string 177 * @throws Exception 178 */ 179 public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 180 Calendar baseCalDate = DateUtils.getCalendar(strBaseDate); 181 StringBuilder buffer = new StringBuilder(); 182 baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset); 183 buffer.append(DateUtils.formatDateOozieTZ(baseCalDate)); 184 return buffer.toString(); 185 } 186 187 public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 188 return ph2_coord_dateOffset(strBaseDate, offset, unit); 189 } 190 191 /** 192 * Returns a date string that is offset from 'strBaseDate' by the difference from Oozie processing timezone to the given 193 * timezone. It will account for daylight saving time based on the given 'strBaseDate' and 'timezone'. 194 * 195 * @param strBaseDate The base date 196 * @param timezone 197 * @return the offset date string 198 * @throws Exception 199 */ 200 public static String ph2_coord_dateTzOffset(String strBaseDate, String timezone) throws Exception { 201 Calendar baseCalDate = DateUtils.getCalendar(strBaseDate); 202 StringBuilder buffer = new StringBuilder(); 203 baseCalDate.setTimeZone(DateUtils.getTimeZone(timezone)); 204 buffer.append(DateUtils.formatDate(baseCalDate)); 205 return buffer.toString(); 206 } 207 208 public static String ph3_coord_dateTzOffset(String strBaseDate, String timezone) throws Exception{ 209 return ph2_coord_dateTzOffset(strBaseDate, timezone); 210 } 211 212 /** 213 * Determine the date-time in Oozie processing timezone of n-th future available dataset instance 214 * from nominal Time but not beyond the instance specified as 'instance. 215 * <p> 216 * It depends on: 217 * <p> 218 * 1. Data set frequency 219 * <p> 220 * 2. Data set Time unit (day, month, minute) 221 * <p> 222 * 3. Data set Time zone/DST 223 * <p> 224 * 4. End Day/Month flag 225 * <p> 226 * 5. Data set initial instance 227 * <p> 228 * 6. Action Creation Time 229 * <p> 230 * 7. Existence of dataset's directory 231 * 232 * @param n :instance count 233 * <p> 234 * domain: n >= 0, n is integer 235 * @param instance: How many future instance it should check? value should 236 * be >=0 237 * @return date-time in Oozie processing timezone of the n-th instance 238 * <p> 239 * @throws Exception 240 */ 241 public static String ph3_coord_future(int n, int instance) throws Exception { 242 ParamChecker.checkGEZero(n, "future:n"); 243 ParamChecker.checkGTZero(instance, "future:instance"); 244 if (isSyncDataSet()) {// For Sync Dataset 245 return coord_future_sync(n, instance); 246 } 247 else { 248 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 249 } 250 } 251 252 /** 253 * Determine the date-time in Oozie processing timezone of the future available dataset instances 254 * from start to end offsets from nominal Time but not beyond the instance specified as 'instance'. 255 * <p> 256 * It depends on: 257 * <p> 258 * 1. Data set frequency 259 * <p> 260 * 2. Data set Time unit (day, month, minute) 261 * <p> 262 * 3. Data set Time zone/DST 263 * <p> 264 * 4. End Day/Month flag 265 * <p> 266 * 5. Data set initial instance 267 * <p> 268 * 6. Action Creation Time 269 * <p> 270 * 7. Existence of dataset's directory 271 * 272 * @param start : start instance offset 273 * <p> 274 * domain: start >= 0, start is integer 275 * @param end : end instance offset 276 * <p> 277 * domain: end >= 0, end is integer 278 * @param instance: How many future instance it should check? value should 279 * be >=0 280 * @return date-time in Oozie processing timezone of the instances from start to end offsets 281 * delimited by comma. 282 * <p> 283 * @throws Exception 284 */ 285 public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception { 286 ParamChecker.checkGEZero(start, "future:n"); 287 ParamChecker.checkGEZero(end, "future:n"); 288 ParamChecker.checkGTZero(instance, "future:instance"); 289 if (isSyncDataSet()) {// For Sync Dataset 290 return coord_futureRange_sync(start, end, instance); 291 } 292 else { 293 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 294 } 295 } 296 297 private static String coord_future_sync(int n, int instance) throws Exception { 298 return coord_futureRange_sync(n, n, instance); 299 } 300 301 private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception { 302 final XLog LOG = XLog.getLog(CoordELFunctions.class); 303 final Thread currentThread = Thread.currentThread(); 304 ELEvaluator eval = ELEvaluator.getCurrent(); 305 String retVal = ""; 306 int datasetFrequency = (int) getDSFrequency();// in minutes 307 TimeUnit dsTimeUnit = getDSTimeUnit(); 308 int[] instCount = new int[1]; 309 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 310 StringBuilder resolvedInstances = new StringBuilder(); 311 StringBuilder resolvedURIPaths = new StringBuilder(); 312 if (nominalInstanceCal != null) { 313 Calendar initInstance = getInitialInstanceCal(); 314 nominalInstanceCal = (Calendar) initInstance.clone(); 315 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 316 317 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 318 if (ds == null) { 319 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 320 } 321 String uriTemplate = ds.getUriTemplate(); 322 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 323 if (conf == null) { 324 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 325 } 326 int available = 0, checkedInstance = 0; 327 boolean resolved = false; 328 String user = ParamChecker 329 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 330 String doneFlag = ds.getDoneFlag(); 331 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 332 URIHandler uriHandler = null; 333 Context uriContext = null; 334 try { 335 while (instance >= checkedInstance && !currentThread.isInterrupted()) { 336 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 337 String uriPath = uriEval.evaluate(uriTemplate, String.class); 338 if (uriHandler == null) { 339 URI uri = new URI(uriPath); 340 uriHandler = uriService.getURIHandler(uri); 341 uriContext = uriHandler.getContext(uri, conf, user, true); 342 } 343 String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); 344 if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) { 345 if (available == endOffset) { 346 LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag); 347 resolved = true; 348 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 349 resolvedURIPaths.append(uriPath); 350 retVal = resolvedInstances.toString(); 351 eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); 352 break; 353 } 354 else if (available >= startOffset) { 355 LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag); 356 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append( 357 INSTANCE_SEPARATOR); 358 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); 359 360 } 361 available++; 362 } 363 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency); 364 nominalInstanceCal = (Calendar) initInstance.clone(); 365 instCount[0]++; 366 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 367 checkedInstance++; 368 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 369 } 370 if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) { 371 eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); 372 } 373 374 } 375 finally { 376 if (uriContext != null) { 377 uriContext.destroy(); 378 } 379 } 380 if (!resolved) { 381 // return unchanged future function with variable 'is_resolved' 382 // to 'false' 383 eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE); 384 if (startOffset == endOffset) { 385 retVal = "${coord:future(" + startOffset + ", " + instance + ")}"; 386 } 387 else { 388 retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}"; 389 } 390 } 391 else { 392 eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE); 393 } 394 } 395 else {// No feasible nominal time 396 eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE); 397 retVal = ""; 398 } 399 return retVal; 400 } 401 402 /** 403 * Return nominal time or Action Creation Time. 404 * <p> 405 * 406 * @return coordinator action creation or materialization date time 407 * @throws Exception if unable to format the Date object to String 408 */ 409 public static String ph2_coord_nominalTime() throws Exception { 410 ELEvaluator eval = ELEvaluator.getCurrent(); 411 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 412 "Coordinator Action"); 413 return DateUtils.formatDateOozieTZ(action.getNominalTime()); 414 } 415 416 public static String ph3_coord_nominalTime() throws Exception { 417 return ph2_coord_nominalTime(); 418 } 419 420 /** 421 * Convert from standard date-time formatting to a desired format. 422 * <p> 423 * @param dateTimeStr - A timestamp in standard (ISO8601) format. 424 * @param format - A string representing the desired format. 425 * @return coordinator action creation or materialization date time 426 * @throws Exception if unable to format the Date object to String 427 */ 428 public static String ph2_coord_formatTime(String dateTimeStr, String format) 429 throws Exception { 430 Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr); 431 return DateUtils.formatDateCustom(dateTime, format); 432 } 433 434 public static String ph3_coord_formatTime(String dateTimeStr, String format) 435 throws Exception { 436 return ph2_coord_formatTime(dateTimeStr, format); 437 } 438 439 /** 440 * Convert from standard date-time formatting to a Unix epoch time. 441 * <p/> 442 * @param dateTimeStr - A timestamp in standard (ISO8601) format. 443 * @param millis - "true" to include millis; otherwise will only include seconds 444 * @return coordinator action creation or materialization date time 445 * @throws Exception if unable to format the Date object to String 446 */ 447 public static String ph2_coord_epochTime(String dateTimeStr, String millis) 448 throws Exception { 449 Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr); 450 return DateUtils.formatDateEpoch(dateTime, Boolean.valueOf(millis)); 451 } 452 453 public static String ph3_coord_epochTime(String dateTimeStr, String millis) 454 throws Exception { 455 return ph2_coord_epochTime(dateTimeStr, millis); 456 } 457 458 /** 459 * Return Action Id. <p> 460 * 461 * @return coordinator action Id 462 */ 463 public static String ph2_coord_actionId() throws Exception { 464 ELEvaluator eval = ELEvaluator.getCurrent(); 465 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 466 "Coordinator Action"); 467 return action.getActionId(); 468 } 469 470 public static String ph3_coord_actionId() throws Exception { 471 return ph2_coord_actionId(); 472 } 473 474 /** 475 * Return Job Name. <p> 476 * 477 * @return coordinator name 478 */ 479 public static String ph2_coord_name() throws Exception { 480 ELEvaluator eval = ELEvaluator.getCurrent(); 481 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 482 "Coordinator Action"); 483 return action.getName(); 484 } 485 486 public static String ph3_coord_name() throws Exception { 487 return ph2_coord_name(); 488 } 489 490 /** 491 * Return Action Start time. <p> 492 * 493 * @return coordinator action start time 494 * @throws Exception if unable to format the Date object to String 495 */ 496 public static String ph2_coord_actualTime() throws Exception { 497 ELEvaluator eval = ELEvaluator.getCurrent(); 498 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 499 if (coordAction == null) { 500 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 501 } 502 return DateUtils.formatDateOozieTZ(coordAction.getActualTime()); 503 } 504 505 public static String ph3_coord_actualTime() throws Exception { 506 return ph2_coord_actualTime(); 507 } 508 509 /** 510 * Used to specify a list of URI's that are used as input dir to the workflow job. <p> Look for two evaluator-level 511 * variables <p> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p> A defines the current list of 512 * URI. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something 513 * unresolved, this function will echo back the original function <p> otherwise it sends the uris. 514 * 515 * @param dataInName : Datain name 516 * @return the list of URI's separated by INSTANCE_SEPARATOR <p> if there are unresolved EL function (i.e. latest) 517 * , echo back <p> the function without resolving the function. 518 */ 519 public static String ph3_coord_dataIn(String dataInName) { 520 String uris = ""; 521 ELEvaluator eval = ELEvaluator.getCurrent(); 522 if (eval.getVariable(".datain." + dataInName) == null 523 && (eval.getVariable(".actionInputLogic") != null && !StringUtils.isEmpty(eval.getVariable( 524 ".actionInputLogic").toString()))) { 525 try { 526 return new CoordInputLogicEvaluatorUtil().getInputDependencies(dataInName, 527 (SyncCoordAction) eval.getVariable(COORD_ACTION)); 528 } 529 catch (JDOMException e) { 530 XLog.getLog(CoordELFunctions.class).error(e); 531 throw new RuntimeException(e.getMessage()); 532 } 533 } 534 535 uris = (String) eval.getVariable(".datain." + dataInName); 536 Object unResolvedObj = eval.getVariable(".datain." + dataInName + ".unresolved"); 537 if (unResolvedObj == null) { 538 return uris; 539 } 540 Boolean unresolved = Boolean.parseBoolean(unResolvedObj.toString()); 541 if (unresolved != null && unresolved.booleanValue() == true) { 542 return "${coord:dataIn('" + dataInName + "')}"; 543 } 544 return uris; 545 } 546 547 /** 548 * Used to specify a list of URI's that are output dir of the workflow job. <p> Look for one evaluator-level 549 * variable <p> dataout.<DATAOUT_NAME> <p> It defines the current list of URI. <p> otherwise it sends the uris. 550 * 551 * @param dataOutName : Dataout name 552 * @return the list of URI's separated by INSTANCE_SEPARATOR 553 */ 554 public static String ph3_coord_dataOut(String dataOutName) { 555 String uris = ""; 556 ELEvaluator eval = ELEvaluator.getCurrent(); 557 uris = (String) eval.getVariable(".dataout." + dataOutName); 558 return uris; 559 } 560 561 /** 562 * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p> It depends on: <p> 1. 563 * Data set frequency <p> 2. 564 * Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST <p> 4. End Day/Month flag <p> 5. Data 565 * set initial instance <p> 6. Action Creation Time 566 * 567 * @param n instance count domain: n is integer 568 * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is 569 * earlier than Initial-Instance of DS 570 * @throws Exception 571 */ 572 public static String ph2_coord_current(int n) throws Exception { 573 if (isSyncDataSet()) { // For Sync Dataset 574 return coord_current_sync(n); 575 } 576 else { 577 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 578 } 579 } 580 581 /** 582 * Determine the date-time in Oozie processing timezone of current dataset instances 583 * from start to end offsets from the nominal time. <p> It depends 584 * on: <p> 1. Data set frequency <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST 585 * <p> 4. End Day/Month flag <p> 5. Data set initial instance <p> 6. Action Creation Time 586 * 587 * @param start :start instance offset <p> domain: start <= 0, start is integer 588 * @param end :end instance offset <p> domain: end <= 0, end is integer 589 * @return date-time in Oozie processing timezone of the instances from start to end offsets 590 * delimited by comma. <p> If the current instance time of the dataset based on the Action Creation Time 591 * is earlier than the Initial-Instance of DS an empty string is returned. 592 * If an instance within the range is earlier than Initial-Instance of DS that instance is ignored 593 * @throws Exception 594 */ 595 public static String ph2_coord_currentRange(int start, int end) throws Exception { 596 if (isSyncDataSet()) { // For Sync Dataset 597 return coord_currentRange_sync(start, end); 598 } 599 else { 600 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 601 } 602 } 603 /** 604 * Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p> It 605 * depends on: <p> 1. Data set frequency <p> 2. Data set Time Unit <p> 3. Data set Time zone/DST 606 * <p> 4. Data set initial instance <p> 5. Action Creation Time 607 * 608 * @param n offset amount (integer) 609 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 610 * @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time 611 * @throws Exception if there was a problem formatting 612 */ 613 public static String ph2_coord_offset(int n, String timeUnit) throws Exception { 614 if (isSyncDataSet()) { // For Sync Dataset 615 return coord_offset_sync(n, timeUnit); 616 } 617 else { 618 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 619 } 620 } 621 622 /** 623 * Determine how many hours is on the date of n-th dataset instance. <p> It depends on: <p> 1. Data set frequency 624 * <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST <p> 4. End Day/Month flag <p> 5. 625 * Data set initial instance <p> 6. Action Creation Time 626 * 627 * @param n instance count <p> domain: n is integer 628 * @return number of hours on that day <p> returns -1 means n-th instance is earlier than Initial-Instance of DS 629 * @throws Exception 630 */ 631 public static int ph2_coord_hoursInDay(int n) throws Exception { 632 int datasetFrequency = (int) getDSFrequency(); 633 // /Calendar nominalInstanceCal = 634 // getCurrentInstance(getActionCreationtime()); 635 Calendar nominalInstanceCal = getEffectiveNominalTime(); 636 if (nominalInstanceCal == null) { 637 return -1; 638 } 639 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 640 /* 641 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 642 * { return -1; } 643 */ 644 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 645 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 646 return DateUtils.hoursInDay(nominalInstanceCal); 647 } 648 649 public static int ph3_coord_hoursInDay(int n) throws Exception { 650 return ph2_coord_hoursInDay(n); 651 } 652 653 /** 654 * Calculate number of days in one month for n-th dataset instance. <p> It depends on: <p> 1. Data set frequency . 655 * <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST <p> 4. End Day/Month flag <p> 5. 656 * Data set initial instance <p> 6. Action Creation Time 657 * 658 * @param n instance count. domain: n is integer 659 * @return number of days in that month <p> returns -1 means n-th instance is earlier than Initial-Instance of DS 660 * @throws Exception 661 */ 662 public static int ph2_coord_daysInMonth(int n) throws Exception { 663 int datasetFrequency = (int) getDSFrequency();// in minutes 664 // Calendar nominalInstanceCal = 665 // getCurrentInstance(getActionCreationtime()); 666 Calendar nominalInstanceCal = getEffectiveNominalTime(); 667 if (nominalInstanceCal == null) { 668 return -1; 669 } 670 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 671 /* 672 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 673 * { return -1; } 674 */ 675 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 676 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 677 return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH); 678 } 679 680 public static int ph3_coord_daysInMonth(int n) throws Exception { 681 return ph2_coord_daysInMonth(n); 682 } 683 684 /** 685 * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p> It depends 686 * on: <p> 1. Data set frequency <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST 687 * <p> 4. End Day/Month flag <p> 5. Data set initial instance <p> 6. Action Creation Time <p> 7. Existence of 688 * dataset's directory 689 * 690 * @param n :instance count <p> domain: n <= 0, n is integer 691 * @return date-time in Oozie processing timezone of the n-th instance <p> returns 'null' means n-th instance is 692 * earlier than Initial-Instance of DS 693 * @throws Exception 694 */ 695 public static String ph3_coord_latest(int n) throws Exception { 696 ParamChecker.checkLEZero(n, "latest:n"); 697 if (isSyncDataSet()) {// For Sync Dataset 698 return coord_latest_sync(n); 699 } 700 else { 701 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 702 } 703 } 704 705 /** 706 * Determine the date-time in Oozie processing timezone of latest available dataset instances 707 * from start to end offsets from the nominal time. <p> It depends 708 * on: <p> 1. Data set frequency <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST 709 * <p> 4. End Day/Month flag <p> 5. Data set initial instance <p> 6. Action Creation Time <p> 7. Existence of 710 * dataset's directory 711 * 712 * @param start :start instance offset <p> domain: start <= 0, start is integer 713 * @param end :end instance offset <p> domain: end <= 0, end is integer 714 * @return date-time in Oozie processing timezone of the instances from start to end offsets 715 * delimited by comma. <p> returns 'null' means start offset instance is 716 * earlier than Initial-Instance of DS 717 * @throws Exception 718 */ 719 public static String ph3_coord_latestRange(int start, int end) throws Exception { 720 ParamChecker.checkLEZero(start, "latest:n"); 721 ParamChecker.checkLEZero(end, "latest:n"); 722 if (isSyncDataSet()) {// For Sync Dataset 723 return coord_latestRange_sync(start, end); 724 } 725 else { 726 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 727 } 728 } 729 730 /** 731 * Configure an evaluator with data set and application specific information. <p> Helper method of associating 732 * dataset and application object 733 * 734 * @param evaluator : to set variables 735 * @param ds : Data Set object 736 * @param coordAction : Application instance 737 */ 738 public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) { 739 evaluator.setVariable(COORD_ACTION, coordAction); 740 evaluator.setVariable(DATASET, ds); 741 } 742 743 /** 744 * Helper method to wrap around with "${..}". <p> 745 * 746 * 747 * @param eval :EL evaluator 748 * @param expr : expression to evaluate 749 * @return Resolved expression or echo back the same expression 750 * @throws Exception 751 */ 752 public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception { 753 try { 754 eval.setVariable(".wrap", null); 755 String result = eval.evaluate(expr, String.class); 756 if (eval.getVariable(".wrap") != null) { 757 return "${" + result + "}"; 758 } 759 else { 760 return result; 761 } 762 } 763 catch (Exception e) { 764 throw new Exception("Unable to evaluate :" + expr + ":\n", e); 765 } 766 } 767 768 // Set of echo functions 769 770 public static String ph1_coord_current_echo(String n) { 771 return echoUnResolved("current", n); 772 } 773 774 public static String ph1_coord_absolute_echo(String date) { 775 return echoUnResolved("absolute", date); 776 } 777 778 public static String ph1_coord_currentRange_echo(String start, String end) { 779 return echoUnResolved("currentRange", start + ", " + end); 780 } 781 782 public static String ph1_coord_offset_echo(String n, String timeUnit) { 783 return echoUnResolved("offset", n + " , " + timeUnit); 784 } 785 786 public static String ph2_coord_current_echo(String n) { 787 return echoUnResolved("current", n); 788 } 789 790 public static String ph2_coord_currentRange_echo(String start, String end) { 791 return echoUnResolved("currentRange", start + ", " + end); 792 } 793 794 public static String ph2_coord_offset_echo(String n, String timeUnit) { 795 return echoUnResolved("offset", n + " , " + timeUnit); 796 } 797 798 public static String ph2_coord_absolute_echo(String date) { 799 return echoUnResolved("absolute", date); 800 } 801 802 public static String ph2_coord_absolute_range(String startInstance, int end) throws Exception { 803 int[] instanceCount = new int[1]; 804 805 // getCurrentInstance() returns null, which means startInstance is less 806 // than initial instance 807 if (getCurrentInstance(DateUtils.getCalendar(startInstance).getTime(), instanceCount) == null) { 808 throw new CommandException(ErrorCode.E1010, 809 "intial-instance should be equal or earlier than the start-instance. intial-instance is " 810 + getInitialInstance() + " and start-instance is " + startInstance); 811 } 812 int[] nominalCount = new int[1]; 813 if (getCurrentInstance(getActionCreationtime(), nominalCount) == null) { 814 throw new CommandException(ErrorCode.E1010, 815 "intial-instance should be equal or earlier than the nominal time. intial-instance is " 816 + getInitialInstance() + " and nominal time is " + getActionCreationtime()); 817 } 818 // getCurrentInstance return offset relative to initial instance. 819 // start instance offset - nominal offset = start offset relative to 820 // nominal time-stamp. 821 int start = instanceCount[0] - nominalCount[0]; 822 if (start > end) { 823 throw new CommandException(ErrorCode.E1010, 824 "start-instance should be equal or earlier than the end-instance. startInstance is " 825 + startInstance + " which is equivalent to current (" + instanceCount[0] 826 + ") but end is specified as current (" + end + ")"); 827 } 828 return ph2_coord_currentRange(start, end); 829 } 830 831 public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) { 832 return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit); 833 } 834 835 public static String ph1_coord_dateTzOffset_echo(String n, String timezone) { 836 return echoUnResolved("dateTzOffset", n + " , " + timezone); 837 } 838 839 public static String ph1_coord_epochTime_echo(String dateTime, String millis) { 840 // Quote the dateTime value since it would contain a ':'. 841 return echoUnResolved("epochTime", "'"+dateTime+"'" + " , " + millis); 842 } 843 844 public static String ph1_coord_formatTime_echo(String dateTime, String format) { 845 // Quote the dateTime value since it would contain a ':'. 846 return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format); 847 } 848 849 public static String ph1_coord_latest_echo(String n) { 850 return echoUnResolved("latest", n); 851 } 852 853 public static String ph2_coord_latest_echo(String n) { 854 return ph1_coord_latest_echo(n); 855 } 856 857 public static String ph1_coord_future_echo(String n, String instance) { 858 return echoUnResolved("future", n + ", " + instance + ""); 859 } 860 861 public static String ph2_coord_future_echo(String n, String instance) { 862 return ph1_coord_future_echo(n, instance); 863 } 864 865 public static String ph1_coord_latestRange_echo(String start, String end) { 866 return echoUnResolved("latestRange", start + ", " + end); 867 } 868 869 public static String ph2_coord_latestRange_echo(String start, String end) { 870 return ph1_coord_latestRange_echo(start, end); 871 } 872 873 public static String ph1_coord_futureRange_echo(String start, String end, String instance) { 874 return echoUnResolved("futureRange", start + ", " + end + ", " + instance); 875 } 876 877 public static String ph2_coord_futureRange_echo(String start, String end, String instance) { 878 return ph1_coord_futureRange_echo(start, end, instance); 879 } 880 881 public static String ph1_coord_dataIn_echo(String n) { 882 ELEvaluator eval = ELEvaluator.getCurrent(); 883 String val = (String) eval.getVariable("oozie.dataname." + n); 884 if ((val == null || val.equals("data-in") == false)) { 885 XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid"); 886 throw new RuntimeException("data_in_name " + n + " is not valid"); 887 } 888 return echoUnResolved("dataIn", "'" + n + "'"); 889 } 890 891 public static String ph1_coord_dataOut_echo(String n) { 892 ELEvaluator eval = ELEvaluator.getCurrent(); 893 String val = (String) eval.getVariable("oozie.dataname." + n); 894 if (val == null || val.equals("data-out") == false) { 895 XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid"); 896 throw new RuntimeException("data_out_name " + n + " is not valid"); 897 } 898 return echoUnResolved("dataOut", "'" + n + "'"); 899 } 900 901 public static String ph1_coord_nominalTime_echo() { 902 return echoUnResolved("nominalTime", ""); 903 } 904 905 public static String ph1_coord_nominalTime_echo_wrap() { 906 // return "${coord:nominalTime()}"; // no resolution 907 return echoUnResolved("nominalTime", ""); 908 } 909 910 public static String ph1_coord_nominalTime_echo_fixed() { 911 return "2009-03-06T010:00"; // Dummy resolution 912 } 913 914 public static String ph1_coord_actualTime_echo_wrap() { 915 // return "${coord:actualTime()}"; // no resolution 916 return echoUnResolved("actualTime", ""); 917 } 918 919 public static String ph1_coord_actionId_echo() { 920 return echoUnResolved("actionId", ""); 921 } 922 923 public static String ph1_coord_name_echo() { 924 return echoUnResolved("name", ""); 925 } 926 927 // The following echo functions are not used in any phases yet 928 // They are here for future purpose. 929 public static String coord_minutes_echo(String n) { 930 return echoUnResolved("minutes", n); 931 } 932 933 public static String coord_hours_echo(String n) { 934 return echoUnResolved("hours", n); 935 } 936 937 public static String coord_days_echo(String n) { 938 return echoUnResolved("days", n); 939 } 940 941 public static String coord_endOfDay_echo(String n) { 942 return echoUnResolved("endOfDay", n); 943 } 944 945 public static String coord_months_echo(String n) { 946 return echoUnResolved("months", n); 947 } 948 949 public static String coord_endOfMonth_echo(String n) { 950 return echoUnResolved("endOfMonth", n); 951 } 952 953 public static String coord_actualTime_echo() { 954 return echoUnResolved("actualTime", ""); 955 } 956 957 // This echo function will always return "24" for validation only. 958 // This evaluation ****should not**** replace the original XML 959 // Create a temporary string and validate the function 960 // This is **required** for evaluating an expression like 961 // coord:HoursInDay(0) + 3 962 // actual evaluation will happen in phase 2 or phase 3. 963 public static String ph1_coord_hoursInDay_echo(String n) { 964 return "24"; 965 // return echoUnResolved("hoursInDay", n); 966 } 967 968 // This echo function will always return "30" for validation only. 969 // This evaluation ****should not**** replace the original XML 970 // Create a temporary string and validate the function 971 // This is **required** for evaluating an expression like 972 // coord:daysInMonth(0) + 3 973 // actual evaluation will happen in phase 2 or phase 3. 974 public static String ph1_coord_daysInMonth_echo(String n) { 975 // return echoUnResolved("daysInMonth", n); 976 return "30"; 977 } 978 979 // This echo function will always return "3" for validation only. 980 // This evaluation ****should not**** replace the original XML 981 // Create a temporary string and validate the function 982 // This is **required** for evaluating an expression like coord:tzOffset + 2 983 // actual evaluation will happen in phase 2 or phase 3. 984 public static String ph1_coord_tzOffset_echo() { 985 // return echoUnResolved("tzOffset", ""); 986 return "3"; 987 } 988 989 // Local methods 990 /** 991 * @param n 992 * @return n-th instance Date-Time from current instance for data-set <p> return empty string ("") if the 993 * Action_Creation_time or the n-th instance <p> is earlier than the Initial_Instance of dataset. 994 * @throws Exception 995 */ 996 private static String coord_current_sync(int n) throws Exception { 997 return coord_currentRange_sync(n, n); 998 } 999 1000 private static String coord_currentRange_sync(int start, int end) throws Exception { 1001 final XLog LOG = XLog.getLog(CoordELFunctions.class); 1002 int datasetFrequency = getDSFrequency();// in minutes 1003 TimeUnit dsTimeUnit = getDSTimeUnit(); 1004 int[] instCount = new int[1];// used as pass by ref 1005 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 1006 if (nominalInstanceCal == null) { 1007 LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is" 1008 + " returned. This means that no data is available at the current-instance specified by the user" 1009 + " and the user could try modifying his initial-instance to an earlier time."); 1010 return ""; 1011 } else { 1012 Calendar initInstance = getInitialInstanceCal(); 1013 // Add in the reverse order - newest instance first. 1014 nominalInstanceCal = (Calendar) initInstance.clone(); 1015 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), (instCount[0] + start) * datasetFrequency); 1016 List<String> instances = new ArrayList<String>(); 1017 for (int i = start; i <= end; i++) { 1018 if (nominalInstanceCal.compareTo(initInstance) < 0) { 1019 LOG.warn("If the initial instance of the dataset is later than the current-instance specified," 1020 + " such as coord:current({0}) in this case, an empty string is returned. This means that" 1021 + " no data is available at the current-instance specified by the user and the user could" 1022 + " try modifying his initial-instance to an earlier time.", start); 1023 } 1024 else { 1025 instances.add(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 1026 } 1027 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency); 1028 } 1029 instances = Lists.reverse(instances); 1030 return StringUtils.join(instances, CoordELFunctions.INSTANCE_SEPARATOR); 1031 } 1032 } 1033 1034 /** 1035 * 1036 * @param n offset amount (integer) 1037 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 1038 * @return the offset time from the effective nominal time <p> return empty string ("") if the Action_Creation_time or the 1039 * offset instance <p> is earlier than the Initial_Instance of dataset. 1040 * @throws Exception 1041 */ 1042 private static String coord_offset_sync(int n, String timeUnit) throws Exception { 1043 Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null); 1044 if (rawCal == null) { 1045 // warning already logged by resolveOffsetRawTime() 1046 return ""; 1047 } 1048 1049 int freq = getDSFrequency(); 1050 TimeUnit freqUnit = getDSTimeUnit(); 1051 int freqCount = 0; 1052 // We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same 1053 // time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time 1054 // In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true 1055 // If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that. 1056 Calendar cal = getInitialInstanceCal(); 1057 if (rawCal.before(cal)) { 1058 while (cal.after(rawCal)) { 1059 cal.add(freqUnit.getCalendarUnit(), -freq); 1060 freqCount--; 1061 } 1062 } 1063 else if (rawCal.after(cal)) { 1064 while (cal.before(rawCal)) { 1065 cal.add(freqUnit.getCalendarUnit(), freq); 1066 freqCount++; 1067 } 1068 } 1069 if (cal.before(rawCal)) { 1070 rawCal = cal; 1071 } 1072 else if (cal.after(rawCal)) { 1073 cal.add(freqUnit.getCalendarUnit(), -freq); 1074 rawCal = cal; 1075 freqCount--; 1076 } 1077 String rawCalStr = DateUtils.formatDateOozieTZ(rawCal); 1078 1079 Calendar nominalInstanceCal = getInitialInstanceCal(); 1080 nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount); 1081 if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) { 1082 XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the offset instance" 1083 + " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no" 1084 + " data is available at the offset instance specified by the user and the user could try modifying his" 1085 + " initial-instance to an earlier time.", n, timeUnit); 1086 return ""; 1087 } 1088 String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal); 1089 1090 if (!rawCalStr.equals(nominalCalStr)) { 1091 throw new RuntimeException("Shouldn't happen"); 1092 } 1093 return rawCalStr; 1094 } 1095 1096 /** 1097 * @param offset 1098 * @return n-th available latest instance Date-Time for SYNC data-set 1099 * @throws Exception 1100 */ 1101 private static String coord_latest_sync(int offset) throws Exception { 1102 return coord_latestRange_sync(offset, offset); 1103 } 1104 1105 private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception { 1106 final XLog LOG = XLog.getLog(CoordELFunctions.class); 1107 final Thread currentThread = Thread.currentThread(); 1108 ELEvaluator eval = ELEvaluator.getCurrent(); 1109 String retVal = ""; 1110 int datasetFrequency = (int) getDSFrequency();// in minutes 1111 TimeUnit dsTimeUnit = getDSTimeUnit(); 1112 int[] instCount = new int[1]; 1113 boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false); 1114 Calendar nominalInstanceCal; 1115 if (useCurrentTime) { 1116 nominalInstanceCal = getCurrentInstance(new Date(), instCount); 1117 } 1118 else { 1119 nominalInstanceCal = getCurrentInstance(getActualTime(), instCount); 1120 } 1121 StringBuilder resolvedInstances = new StringBuilder(); 1122 StringBuilder resolvedURIPaths = new StringBuilder(); 1123 if (nominalInstanceCal != null) { 1124 Calendar initInstance = getInitialInstanceCal(); 1125 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1126 if (ds == null) { 1127 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1128 } 1129 String uriTemplate = ds.getUriTemplate(); 1130 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 1131 if (conf == null) { 1132 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 1133 } 1134 int available = 0; 1135 boolean resolved = false; 1136 String user = ParamChecker 1137 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 1138 String doneFlag = ds.getDoneFlag(); 1139 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 1140 URIHandler uriHandler = null; 1141 Context uriContext = null; 1142 try { 1143 while (nominalInstanceCal.compareTo(initInstance) >= 0 && !currentThread.isInterrupted()) { 1144 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 1145 String uriPath = uriEval.evaluate(uriTemplate, String.class); 1146 if (uriHandler == null) { 1147 URI uri = new URI(uriPath); 1148 uriHandler = uriService.getURIHandler(uri); 1149 uriContext = uriHandler.getContext(uri, conf, user, true); 1150 } 1151 String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); 1152 if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) { 1153 XLog.getLog(CoordELFunctions.class) 1154 .debug("Found latest(" + available + "): " + uriWithDoneFlag); 1155 if (available == startOffset) { 1156 LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag); 1157 resolved = true; 1158 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 1159 resolvedURIPaths.append(uriPath); 1160 retVal = resolvedInstances.toString(); 1161 eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); 1162 1163 break; 1164 } 1165 else if (available <= endOffset) { 1166 LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag); 1167 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append( 1168 INSTANCE_SEPARATOR); 1169 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); 1170 } 1171 1172 available--; 1173 } 1174 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency); 1175 nominalInstanceCal = (Calendar) initInstance.clone(); 1176 instCount[0]--; 1177 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 1178 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 1179 } 1180 if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) { 1181 eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); 1182 } 1183 } 1184 finally { 1185 if (uriContext != null) { 1186 uriContext.destroy(); 1187 } 1188 } 1189 if (!resolved) { 1190 // return unchanged latest function with variable 'is_resolved' 1191 // to 'false' 1192 eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE); 1193 if (startOffset == endOffset) { 1194 retVal = "${coord:latest(" + startOffset + ")}"; 1195 } 1196 else { 1197 retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}"; 1198 } 1199 } 1200 else { 1201 eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE); 1202 } 1203 } 1204 else {// No feasible nominal time 1205 eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE); 1206 } 1207 return retVal; 1208 } 1209 1210 /** 1211 * @param tm 1212 * @return a new Evaluator to be used for URI-template evaluation 1213 */ 1214 private static ELEvaluator getUriEvaluator(Calendar tm) { 1215 tm.setTimeZone(DateUtils.getOozieProcessingTimeZone()); 1216 ELEvaluator retEval = new ELEvaluator(); 1217 retEval.setVariable("YEAR", tm.get(Calendar.YEAR)); 1218 retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm 1219 .get(Calendar.MONTH) + 1)); 1220 retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm 1221 .get(Calendar.DAY_OF_MONTH)); 1222 retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm 1223 .get(Calendar.HOUR_OF_DAY)); 1224 retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm 1225 .get(Calendar.MINUTE)); 1226 return retEval; 1227 } 1228 1229 /** 1230 * @return whether a data set is SYNCH or ASYNC 1231 */ 1232 private static boolean isSyncDataSet() { 1233 ELEvaluator eval = ELEvaluator.getCurrent(); 1234 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1235 if (ds == null) { 1236 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1237 } 1238 return ds.getType().equalsIgnoreCase("SYNC"); 1239 } 1240 1241 /** 1242 * Check whether a function should be resolved. 1243 * 1244 * @param functionName 1245 * @param n 1246 * @return null if the functionName needs to be resolved otherwise return the calling function unresolved. 1247 */ 1248 private static String checkIfResolved(String functionName, String n) { 1249 ELEvaluator eval = ELEvaluator.getCurrent(); 1250 String replace = (String) eval.getVariable("resolve_" + functionName); 1251 if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't 1252 // resolve 1253 // return "${coord:" + functionName + "(" + n +")}"; //Unresolved 1254 eval.setVariable(".wrap", "true"); 1255 return "coord:" + functionName + "(" + n + ")"; // Unresolved 1256 } 1257 return null; // Resolved it 1258 } 1259 1260 private static String echoUnResolved(String functionName, String n) { 1261 return echoUnResolvedPre(functionName, n, "coord:"); 1262 } 1263 1264 private static String echoUnResolvedPre(String functionName, String n, String prefix) { 1265 ELEvaluator eval = ELEvaluator.getCurrent(); 1266 eval.setVariable(".wrap", "true"); 1267 return prefix + functionName + "(" + n + ")"; // Unresolved 1268 } 1269 1270 /** 1271 * @return the initial instance of a DataSet in DATE 1272 */ 1273 private static Date getInitialInstance() { 1274 ELEvaluator eval = ELEvaluator.getCurrent(); 1275 return getInitialInstance(eval); 1276 } 1277 1278 /** 1279 * @return the initial instance of a DataSet in DATE 1280 */ 1281 private static Date getInitialInstance(ELEvaluator eval) { 1282 return getInitialInstanceCal(eval).getTime(); 1283 // return ds.getInitInstance(); 1284 } 1285 1286 /** 1287 * @return the initial instance of a DataSet in Calendar 1288 */ 1289 private static Calendar getInitialInstanceCal() { 1290 ELEvaluator eval = ELEvaluator.getCurrent(); 1291 return getInitialInstanceCal(eval); 1292 } 1293 1294 /** 1295 * @return the initial instance of a DataSet in Calendar 1296 */ 1297 private static Calendar getInitialInstanceCal(ELEvaluator eval) { 1298 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1299 if (ds == null) { 1300 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1301 } 1302 Calendar effInitTS = new GregorianCalendar(ds.getTimeZone()); 1303 effInitTS.setTime(ds.getInitInstance()); 1304 // To adjust EOD/EOM 1305 DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval)); 1306 return effInitTS; 1307 // return ds.getInitInstance(); 1308 } 1309 1310 /** 1311 * @return Nominal or action creation Time when all the dependencies of an application instance are met. 1312 */ 1313 private static Date getActionCreationtime() { 1314 ELEvaluator eval = ELEvaluator.getCurrent(); 1315 return getActionCreationtime(eval); 1316 } 1317 1318 /** 1319 * @return Nominal or action creation Time when all the dependencies of an application instance are met. 1320 */ 1321 private static Date getActionCreationtime(ELEvaluator eval) { 1322 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1323 if (coordAction == null) { 1324 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1325 } 1326 return coordAction.getNominalTime(); 1327 } 1328 1329 /** 1330 * @return Actual Time when all the dependencies of an application instance are met. 1331 */ 1332 private static Date getActualTime() { 1333 ELEvaluator eval = ELEvaluator.getCurrent(); 1334 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1335 if (coordAction == null) { 1336 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1337 } 1338 return coordAction.getActualTime(); 1339 } 1340 1341 /** 1342 * @return TimeZone for the application or job. 1343 */ 1344 private static TimeZone getJobTZ() { 1345 ELEvaluator eval = ELEvaluator.getCurrent(); 1346 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1347 if (coordAction == null) { 1348 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1349 } 1350 return coordAction.getTimeZone(); 1351 } 1352 1353 /** 1354 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 1355 * 1356 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 1357 * the dataset. 1358 */ 1359 public static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) { 1360 ELEvaluator eval = ELEvaluator.getCurrent(); 1361 return getCurrentInstance(effectiveTime, instanceCount, eval); 1362 } 1363 1364 /** 1365 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 1366 * 1367 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 1368 * the dataset. 1369 */ 1370 private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) { 1371 Date datasetInitialInstance = getInitialInstance(eval); 1372 TimeUnit dsTimeUnit = getDSTimeUnit(eval); 1373 TimeZone dsTZ = getDatasetTZ(eval); 1374 int dsFreq = getDSFrequency(eval); 1375 // Convert Date to Calendar for corresponding TZ 1376 Calendar current = Calendar.getInstance(dsTZ); 1377 current.setTime(datasetInitialInstance); 1378 1379 Calendar calEffectiveTime = new GregorianCalendar(dsTZ); 1380 calEffectiveTime.setTime(effectiveTime); 1381 if (instanceCount == null) { // caller doesn't care about this value 1382 instanceCount = new int[1]; 1383 } 1384 instanceCount[0] = 0; 1385 if (current.compareTo(calEffectiveTime) > 0) { 1386 return null; 1387 } 1388 1389 switch(dsTimeUnit) { 1390 case MINUTE: 1391 instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MINUTE_MSEC); 1392 break; 1393 case HOUR: 1394 instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / HOUR_MSEC); 1395 break; 1396 case DAY: 1397 case END_OF_DAY: 1398 instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / DAY_MSEC); 1399 break; 1400 case MONTH: 1401 case END_OF_MONTH: 1402 int diffYear = calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR); 1403 instanceCount[0] = diffYear * 12 + calEffectiveTime.get(Calendar.MONTH) - current.get(Calendar.MONTH); 1404 break; 1405 case YEAR: 1406 instanceCount[0] = calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR); 1407 break; 1408 default: 1409 throw new IllegalArgumentException("Unhandled dataset time unit " + dsTimeUnit); 1410 } 1411 1412 if (instanceCount[0] > 2) { 1413 instanceCount[0] = (instanceCount[0] / dsFreq); 1414 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); 1415 } else { 1416 instanceCount[0] = 0; 1417 } 1418 while (!current.getTime().after(effectiveTime)) { 1419 current.add(dsTimeUnit.getCalendarUnit(), dsFreq); 1420 instanceCount[0]++; 1421 } 1422 current.add(dsTimeUnit.getCalendarUnit(), -dsFreq); 1423 instanceCount[0]--; 1424 return current; 1425 } 1426 1427 /** 1428 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 1429 * 1430 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 1431 * the dataset. 1432 */ 1433 private static Calendar getCurrentInstance_old(Date effectiveTime, int instanceCount[], ELEvaluator eval) { 1434 Date datasetInitialInstance = getInitialInstance(eval); 1435 TimeUnit dsTimeUnit = getDSTimeUnit(eval); 1436 TimeZone dsTZ = getDatasetTZ(eval); 1437 int dsFreq = getDSFrequency(eval); 1438 // Convert Date to Calendar for corresponding TZ 1439 Calendar current = Calendar.getInstance(); 1440 current.setTime(datasetInitialInstance); 1441 current.setTimeZone(dsTZ); 1442 1443 Calendar calEffectiveTime = Calendar.getInstance(); 1444 calEffectiveTime.setTime(effectiveTime); 1445 calEffectiveTime.setTimeZone(dsTZ); 1446 if (instanceCount == null) { // caller doesn't care about this value 1447 instanceCount = new int[1]; 1448 } 1449 instanceCount[0] = 0; 1450 if (current.compareTo(calEffectiveTime) > 0) { 1451 return null; 1452 } 1453 Calendar origCurrent = (Calendar) current.clone(); 1454 while (current.compareTo(calEffectiveTime) <= 0) { 1455 current = (Calendar) origCurrent.clone(); 1456 instanceCount[0]++; 1457 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); 1458 } 1459 instanceCount[0]--; 1460 1461 current = (Calendar) origCurrent.clone(); 1462 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); 1463 return current; 1464 } 1465 1466 public static Calendar getEffectiveNominalTime() { 1467 Date datasetInitialInstance = getInitialInstance(); 1468 TimeZone dsTZ = getDatasetTZ(); 1469 // Convert Date to Calendar for corresponding TZ 1470 Calendar current = Calendar.getInstance(); 1471 current.setTime(datasetInitialInstance); 1472 current.setTimeZone(dsTZ); 1473 1474 Calendar calEffectiveTime = Calendar.getInstance(); 1475 calEffectiveTime.setTime(getActionCreationtime()); 1476 calEffectiveTime.setTimeZone(dsTZ); 1477 if (current.compareTo(calEffectiveTime) > 0) { 1478 // Nominal Time < initial Instance 1479 // TODO: getClass() call doesn't work from static method. 1480 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+ 1481 // current.getTime()); 1482 return null; 1483 } 1484 return calEffectiveTime; 1485 } 1486 1487 /** 1488 * @return dataset frequency in minutes 1489 */ 1490 private static int getDSFrequency() { 1491 ELEvaluator eval = ELEvaluator.getCurrent(); 1492 return getDSFrequency(eval); 1493 } 1494 1495 /** 1496 * @return dataset frequency in minutes 1497 */ 1498 private static int getDSFrequency(ELEvaluator eval) { 1499 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1500 if (ds == null) { 1501 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1502 } 1503 return ds.getFrequency(); 1504 } 1505 1506 /** 1507 * @return dataset TimeUnit 1508 */ 1509 private static TimeUnit getDSTimeUnit() { 1510 ELEvaluator eval = ELEvaluator.getCurrent(); 1511 return getDSTimeUnit(eval); 1512 } 1513 1514 /** 1515 * @return dataset TimeUnit 1516 */ 1517 public static TimeUnit getDSTimeUnit(ELEvaluator eval) { 1518 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1519 if (ds == null) { 1520 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1521 } 1522 return ds.getTimeUnit(); 1523 } 1524 1525 /** 1526 * @return dataset TimeZone 1527 */ 1528 public static TimeZone getDatasetTZ() { 1529 ELEvaluator eval = ELEvaluator.getCurrent(); 1530 return getDatasetTZ(eval); 1531 } 1532 1533 /** 1534 * @return dataset TimeZone 1535 */ 1536 private static TimeZone getDatasetTZ(ELEvaluator eval) { 1537 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1538 if (ds == null) { 1539 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1540 } 1541 return ds.getTimeZone(); 1542 } 1543 1544 /** 1545 * @return dataset TimeUnit 1546 */ 1547 private static TimeUnit getDSEndOfFlag() { 1548 ELEvaluator eval = ELEvaluator.getCurrent(); 1549 return getDSEndOfFlag(eval); 1550 } 1551 1552 /** 1553 * @return dataset TimeUnit 1554 */ 1555 private static TimeUnit getDSEndOfFlag(ELEvaluator eval) { 1556 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1557 if (ds == null) { 1558 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1559 } 1560 return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration(); 1561 } 1562 1563 /** 1564 * Return a job configuration property for the coordinator. 1565 * 1566 * @param property property name. 1567 * @return the value of the property, <code>null</code> if the property is undefined. 1568 */ 1569 public static String coord_conf(String property) { 1570 ELEvaluator eval = ELEvaluator.getCurrent(); 1571 return (String) eval.getVariable(property); 1572 } 1573 1574 /** 1575 * Return the user that submitted the coordinator job. 1576 * 1577 * @return the user that submitted the coordinator job. 1578 */ 1579 public static String coord_user() { 1580 ELEvaluator eval = ELEvaluator.getCurrent(); 1581 return (String) eval.getVariable(OozieClient.USER_NAME); 1582 } 1583 1584 /** 1585 * Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur 1586 * between them. The caller should make sure that startCal is earlier than endCal. 1587 * <p> 1588 * As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal 1589 * time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60 1590 * 1591 * @param startCal The earlier offset time 1592 * @param endCal The later offset time 1593 * @param eval The ELEvaluator to use; cannot be null 1594 * @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal 1595 */ 1596 public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) { 1597 List<Integer> expandedFreqs = new ArrayList<Integer>(); 1598 // Use eval because the "current" eval isn't set 1599 int freq = getDSFrequency(eval); 1600 TimeUnit freqUnit = getDSTimeUnit(eval); 1601 Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval); 1602 int totalFreq = 0; 1603 if (startCal.before(cal)) { 1604 while (cal.after(startCal)) { 1605 cal.add(freqUnit.getCalendarUnit(), -freq); 1606 totalFreq += -freq; 1607 } 1608 if (cal.before(startCal)) { 1609 cal.add(freqUnit.getCalendarUnit(), freq); 1610 totalFreq += freq; 1611 } 1612 } 1613 else if (startCal.after(cal)) { 1614 while (cal.before(startCal)) { 1615 cal.add(freqUnit.getCalendarUnit(), freq); 1616 totalFreq += freq; 1617 } 1618 } 1619 // At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the 1620 // effective nominal time. Now we can find all of the instances that occur between startCal and endCal, inclusive. 1621 while (cal.before(endCal) || cal.equals(endCal)) { 1622 expandedFreqs.add(totalFreq); 1623 cal.add(freqUnit.getCalendarUnit(), freq); 1624 totalFreq += freq; 1625 } 1626 return expandedFreqs; 1627 } 1628 1629 /** 1630 * Resolve the offset time from the effective nominal time 1631 * 1632 * @param n offset amount (integer) 1633 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 1634 * @param eval The ELEvaluator to use; or null to use the "current" eval 1635 * @return A Calendar of the offset time 1636 */ 1637 public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) { 1638 // Use eval if given (for when the "current" eval isn't set) 1639 Calendar cal; 1640 if (eval == null) { 1641 cal = getCurrentInstance(getActionCreationtime(), null); 1642 } 1643 else { 1644 cal = getCurrentInstance(getActionCreationtime(eval), null, eval); 1645 } 1646 if (cal == null) { 1647 XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the nominal time, an" 1648 + " empty string is returned. This means that no data is available at the offset instance specified by the user" 1649 + " and the user could try modifying his or her initial-instance to an earlier time."); 1650 return null; 1651 } 1652 cal.add(timeUnit.getCalendarUnit(), n); 1653 return cal; 1654 } 1655}