001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.coord; 019 020import com.google.common.collect.Lists; 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.oozie.ErrorCode; 024import org.apache.oozie.client.OozieClient; 025import org.apache.oozie.command.CommandException; 026import org.apache.oozie.dependency.URIHandler; 027import org.apache.oozie.dependency.URIHandler.Context; 028import org.apache.oozie.service.Services; 029import org.apache.oozie.service.URIHandlerService; 030import org.apache.oozie.util.DateUtils; 031import org.apache.oozie.util.ELEvaluator; 032import org.apache.oozie.util.ParamChecker; 033import org.apache.oozie.util.XLog; 034 035import java.net.URI; 036import java.util.ArrayList; 037import java.util.Calendar; 038import java.util.Date; 039import java.util.GregorianCalendar; 040import java.util.List; 041import java.util.TimeZone; 042 043/** 044 * This class implements the EL function related to coordinator 045 */ 046 047public class CoordELFunctions { 048 final public static String DATASET = "oozie.coord.el.dataset.bean"; 049 final public static String COORD_ACTION = "oozie.coord.el.app.bean"; 050 final public static String CONFIGURATION = "oozie.coord.el.conf"; 051 final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time"; 052 // INSTANCE_SEPARATOR is used to separate multiple directories into one tag. 053 final public static String INSTANCE_SEPARATOR = "#"; 054 final public static String DIR_SEPARATOR = ","; 055 // TODO: in next release, support flexibility 056 private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS"; 057 058 public static final long MINUTE_MSEC = 60 * 1000L; 059 public static final long HOUR_MSEC = 60 * MINUTE_MSEC; 060 public static final long DAY_MSEC = 24 * HOUR_MSEC; 061 public static final long MONTH_MSEC = 30 * DAY_MSEC; 062 public static final long YEAR_MSEC = 365 * DAY_MSEC; 063 064 /** 065 * Used in defining the frequency in 'day' unit. <p/> domain: <code> val > 0</code> and should be integer. 066 * 067 * @param val frequency in number of days. 068 * @return number of days and also set the frequency timeunit to "day" 069 */ 070 public static int ph1_coord_days(int val) { 071 val = ParamChecker.checkGTZero(val, "n"); 072 ELEvaluator eval = ELEvaluator.getCurrent(); 073 eval.setVariable("timeunit", TimeUnit.DAY); 074 eval.setVariable("endOfDuration", TimeUnit.NONE); 075 return val; 076 } 077 078 /** 079 * Used in defining the frequency in 'month' unit. <p/> domain: <code> val > 0</code> and should be integer. 080 * 081 * @param val frequency in number of months. 082 * @return number of months and also set the frequency timeunit to "month" 083 */ 084 public static int ph1_coord_months(int val) { 085 val = ParamChecker.checkGTZero(val, "n"); 086 ELEvaluator eval = ELEvaluator.getCurrent(); 087 eval.setVariable("timeunit", TimeUnit.MONTH); 088 eval.setVariable("endOfDuration", TimeUnit.NONE); 089 return val; 090 } 091 092 /** 093 * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val > 0</code> and should 094 * be integer. 095 * 096 * @param val frequency in number of hours. 097 * @return number of minutes and also set the frequency timeunit to "minute" 098 */ 099 public static int ph1_coord_hours(int val) { 100 val = ParamChecker.checkGTZero(val, "n"); 101 ELEvaluator eval = ELEvaluator.getCurrent(); 102 eval.setVariable("timeunit", TimeUnit.MINUTE); 103 eval.setVariable("endOfDuration", TimeUnit.NONE); 104 return val * 60; 105 } 106 107 /** 108 * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val > 0</code> and should be integer. 109 * 110 * @param val frequency in number of minutes. 111 * @return number of minutes and also set the frequency timeunit to "minute" 112 */ 113 public static int ph1_coord_minutes(int val) { 114 val = ParamChecker.checkGTZero(val, "n"); 115 ELEvaluator eval = ELEvaluator.getCurrent(); 116 eval.setVariable("timeunit", TimeUnit.MINUTE); 117 eval.setVariable("endOfDuration", TimeUnit.NONE); 118 return val; 119 } 120 121 /** 122 * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will 123 * start at 00:00 hour of each day. <p/> domain: <code> val > 0</code> and should be integer. 124 * 125 * @param val frequency in number of days. 126 * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day" 127 */ 128 public static int ph1_coord_endOfDays(int val) { 129 val = ParamChecker.checkGTZero(val, "n"); 130 ELEvaluator eval = ELEvaluator.getCurrent(); 131 eval.setVariable("timeunit", TimeUnit.DAY); 132 eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY); 133 return val; 134 } 135 136 /** 137 * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will 138 * start at first day of each month at 00:00 hour. <p/> domain: <code> val > 0</code> and should be integer. 139 * 140 * @param val: frequency in number of months. 141 * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month" 142 */ 143 public static int ph1_coord_endOfMonths(int val) { 144 val = ParamChecker.checkGTZero(val, "n"); 145 ELEvaluator eval = ELEvaluator.getCurrent(); 146 eval.setVariable("timeunit", TimeUnit.MONTH); 147 eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH); 148 return val; 149 } 150 151 /** 152 * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/> 153 * 1. Timezone of both dataset and job <p/> 2. Action creation Time 154 * 155 * @return difference in minutes (DataSet TZ Offset - Application TZ offset) 156 */ 157 public static int ph2_coord_tzOffset() { 158 long actionCreationTime = getActionCreationtime().getTime(); 159 TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ"); 160 TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ"); 161 return (dsTZ.getOffset(actionCreationTime) - jobTZ.getOffset(actionCreationTime)) / (1000 * 60); 162 } 163 164 public static int ph3_coord_tzOffset() { 165 return ph2_coord_tzOffset(); 166 } 167 168 /** 169 * Returns the a date string while given a base date in 'strBaseDate', 170 * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH). 171 * 172 * @param strBaseDate -- base date 173 * @param offset -- any number 174 * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH 175 * @return date string 176 * @throws Exception 177 */ 178 public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 179 Calendar baseCalDate = DateUtils.getCalendar(strBaseDate); 180 StringBuilder buffer = new StringBuilder(); 181 baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset); 182 buffer.append(DateUtils.formatDateOozieTZ(baseCalDate)); 183 return buffer.toString(); 184 } 185 186 public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 187 return ph2_coord_dateOffset(strBaseDate, offset, unit); 188 } 189 190 /** 191 * Determine the date-time in Oozie processing timezone of n-th future available dataset instance 192 * from nominal Time but not beyond the instance specified as 'instance. 193 * <p/> 194 * It depends on: 195 * <p/> 196 * 1. Data set frequency 197 * <p/> 198 * 2. Data set Time unit (day, month, minute) 199 * <p/> 200 * 3. Data set Time zone/DST 201 * <p/> 202 * 4. End Day/Month flag 203 * <p/> 204 * 5. Data set initial instance 205 * <p/> 206 * 6. Action Creation Time 207 * <p/> 208 * 7. Existence of dataset's directory 209 * 210 * @param n :instance count 211 * <p/> 212 * domain: n >= 0, n is integer 213 * @param instance: How many future instance it should check? value should 214 * be >=0 215 * @return date-time in Oozie processing timezone of the n-th instance 216 * <p/> 217 * @throws Exception 218 */ 219 public static String ph3_coord_future(int n, int instance) throws Exception { 220 ParamChecker.checkGEZero(n, "future:n"); 221 ParamChecker.checkGTZero(instance, "future:instance"); 222 if (isSyncDataSet()) {// For Sync Dataset 223 return coord_future_sync(n, instance); 224 } 225 else { 226 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 227 } 228 } 229 230 /** 231 * Determine the date-time in Oozie processing timezone of the future available dataset instances 232 * from start to end offsets from nominal Time but not beyond the instance specified as 'instance'. 233 * <p/> 234 * It depends on: 235 * <p/> 236 * 1. Data set frequency 237 * <p/> 238 * 2. Data set Time unit (day, month, minute) 239 * <p/> 240 * 3. Data set Time zone/DST 241 * <p/> 242 * 4. End Day/Month flag 243 * <p/> 244 * 5. Data set initial instance 245 * <p/> 246 * 6. Action Creation Time 247 * <p/> 248 * 7. Existence of dataset's directory 249 * 250 * @param start : start instance offset 251 * <p/> 252 * domain: start >= 0, start is integer 253 * @param end : end instance offset 254 * <p/> 255 * domain: end >= 0, end is integer 256 * @param instance: How many future instance it should check? value should 257 * be >=0 258 * @return date-time in Oozie processing timezone of the instances from start to end offsets 259 * delimited by comma. 260 * <p/> 261 * @throws Exception 262 */ 263 public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception { 264 ParamChecker.checkGEZero(start, "future:n"); 265 ParamChecker.checkGEZero(end, "future:n"); 266 ParamChecker.checkGTZero(instance, "future:instance"); 267 if (isSyncDataSet()) {// For Sync Dataset 268 return coord_futureRange_sync(start, end, instance); 269 } 270 else { 271 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 272 } 273 } 274 275 private static String coord_future_sync(int n, int instance) throws Exception { 276 return coord_futureRange_sync(n, n, instance); 277 } 278 279 private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception { 280 final XLog LOG = XLog.getLog(CoordELFunctions.class); 281 final Thread currentThread = Thread.currentThread(); 282 ELEvaluator eval = ELEvaluator.getCurrent(); 283 String retVal = ""; 284 int datasetFrequency = (int) getDSFrequency();// in minutes 285 TimeUnit dsTimeUnit = getDSTimeUnit(); 286 int[] instCount = new int[1]; 287 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 288 StringBuilder resolvedInstances = new StringBuilder(); 289 StringBuilder resolvedURIPaths = new StringBuilder(); 290 if (nominalInstanceCal != null) { 291 Calendar initInstance = getInitialInstanceCal(); 292 nominalInstanceCal = (Calendar) initInstance.clone(); 293 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 294 295 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 296 if (ds == null) { 297 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 298 } 299 String uriTemplate = ds.getUriTemplate(); 300 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 301 if (conf == null) { 302 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 303 } 304 int available = 0, checkedInstance = 0; 305 boolean resolved = false; 306 String user = ParamChecker 307 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 308 String doneFlag = ds.getDoneFlag(); 309 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 310 URIHandler uriHandler = null; 311 Context uriContext = null; 312 try { 313 while (instance >= checkedInstance && !currentThread.isInterrupted()) { 314 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 315 String uriPath = uriEval.evaluate(uriTemplate, String.class); 316 if (uriHandler == null) { 317 URI uri = new URI(uriPath); 318 uriHandler = uriService.getURIHandler(uri); 319 uriContext = uriHandler.getContext(uri, conf, user); 320 } 321 String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); 322 if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) { 323 if (available == endOffset) { 324 LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag); 325 resolved = true; 326 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 327 resolvedURIPaths.append(uriPath); 328 retVal = resolvedInstances.toString(); 329 eval.setVariable("resolved_path", resolvedURIPaths.toString()); 330 break; 331 } 332 else if (available >= startOffset) { 333 LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag); 334 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append( 335 INSTANCE_SEPARATOR); 336 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); 337 } 338 available++; 339 } 340 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency); 341 nominalInstanceCal = (Calendar) initInstance.clone(); 342 instCount[0]++; 343 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 344 checkedInstance++; 345 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 346 } 347 } 348 finally { 349 if (uriContext != null) { 350 uriContext.destroy(); 351 } 352 } 353 if (!resolved) { 354 // return unchanged future function with variable 'is_resolved' 355 // to 'false' 356 eval.setVariable("is_resolved", Boolean.FALSE); 357 if (startOffset == endOffset) { 358 retVal = "${coord:future(" + startOffset + ", " + instance + ")}"; 359 } 360 else { 361 retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}"; 362 } 363 } 364 else { 365 eval.setVariable("is_resolved", Boolean.TRUE); 366 } 367 } 368 else {// No feasible nominal time 369 eval.setVariable("is_resolved", Boolean.TRUE); 370 retVal = ""; 371 } 372 return retVal; 373 } 374 375 /** 376 * Return nominal time or Action Creation Time. 377 * <p/> 378 * 379 * @return coordinator action creation or materialization date time 380 * @throws Exception if unable to format the Date object to String 381 */ 382 public static String ph2_coord_nominalTime() throws Exception { 383 ELEvaluator eval = ELEvaluator.getCurrent(); 384 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 385 "Coordinator Action"); 386 return DateUtils.formatDateOozieTZ(action.getNominalTime()); 387 } 388 389 public static String ph3_coord_nominalTime() throws Exception { 390 return ph2_coord_nominalTime(); 391 } 392 393 /** 394 * Convert from standard date-time formatting to a desired format. 395 * <p/> 396 * @param dateTimeStr - A timestamp in standard (ISO8601) format. 397 * @param format - A string representing the desired format. 398 * @return coordinator action creation or materialization date time 399 * @throws Exception if unable to format the Date object to String 400 */ 401 public static String ph2_coord_formatTime(String dateTimeStr, String format) 402 throws Exception { 403 Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr); 404 return DateUtils.formatDateCustom(dateTime, format); 405 } 406 407 public static String ph3_coord_formatTime(String dateTimeStr, String format) 408 throws Exception { 409 return ph2_coord_formatTime(dateTimeStr, format); 410 } 411 412 /** 413 * Return Action Id. <p/> 414 * 415 * @return coordinator action Id 416 */ 417 public static String ph2_coord_actionId() throws Exception { 418 ELEvaluator eval = ELEvaluator.getCurrent(); 419 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 420 "Coordinator Action"); 421 return action.getActionId(); 422 } 423 424 public static String ph3_coord_actionId() throws Exception { 425 return ph2_coord_actionId(); 426 } 427 428 /** 429 * Return Job Name. <p/> 430 * 431 * @return coordinator name 432 */ 433 public static String ph2_coord_name() throws Exception { 434 ELEvaluator eval = ELEvaluator.getCurrent(); 435 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 436 "Coordinator Action"); 437 return action.getName(); 438 } 439 440 public static String ph3_coord_name() throws Exception { 441 return ph2_coord_name(); 442 } 443 444 /** 445 * Return Action Start time. <p/> 446 * 447 * @return coordinator action start time 448 * @throws Exception if unable to format the Date object to String 449 */ 450 public static String ph2_coord_actualTime() throws Exception { 451 ELEvaluator eval = ELEvaluator.getCurrent(); 452 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 453 if (coordAction == null) { 454 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 455 } 456 return DateUtils.formatDateOozieTZ(coordAction.getActualTime()); 457 } 458 459 public static String ph3_coord_actualTime() throws Exception { 460 return ph2_coord_actualTime(); 461 } 462 463 /** 464 * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level 465 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of 466 * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 467 * unresolved, this function will echo back the original function <p/> otherwise it sends the uris. 468 * 469 * @param dataInName : Datain name 470 * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest) 471 * , echo back <p/> the function without resolving the function. 472 */ 473 public static String ph3_coord_dataIn(String dataInName) { 474 String uris = ""; 475 ELEvaluator eval = ELEvaluator.getCurrent(); 476 uris = (String) eval.getVariable(".datain." + dataInName); 477 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 478 if (unresolved != null && unresolved.booleanValue() == true) { 479 return "${coord:dataIn('" + dataInName + "')}"; 480 } 481 return uris; 482 } 483 484 /** 485 * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level 486 * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris. 487 * 488 * @param dataOutName : Dataout name 489 * @return the list of URI's separated by INSTANCE_SEPARATOR 490 */ 491 public static String ph3_coord_dataOut(String dataOutName) { 492 String uris = ""; 493 ELEvaluator eval = ELEvaluator.getCurrent(); 494 uris = (String) eval.getVariable(".dataout." + dataOutName); 495 return uris; 496 } 497 498 /** 499 * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p/> It depends on: <p/> 1. 500 * Data set frequency <p/> 2. 501 * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data 502 * set initial instance <p/> 6. Action Creation Time 503 * 504 * @param n instance count domain: n is integer 505 * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is 506 * earlier than Initial-Instance of DS 507 * @throws Exception 508 */ 509 public static String ph2_coord_current(int n) throws Exception { 510 if (isSyncDataSet()) { // For Sync Dataset 511 return coord_current_sync(n); 512 } 513 else { 514 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 515 } 516 } 517 518 /** 519 * Determine the date-time in Oozie processing timezone of current dataset instances 520 * from start to end offsets from the nominal time. <p/> It depends 521 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST 522 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time 523 * 524 * @param start :start instance offset <p/> domain: start <= 0, start is integer 525 * @param end :end instance offset <p/> domain: end <= 0, end is integer 526 * @return date-time in Oozie processing timezone of the instances from start to end offsets 527 * delimited by comma. <p/> If the current instance time of the dataset based on the Action Creation Time 528 * is earlier than the Initial-Instance of DS an empty string is returned. 529 * If an instance within the range is earlier than Initial-Instance of DS that instance is ignored 530 * @throws Exception 531 */ 532 public static String ph2_coord_currentRange(int start, int end) throws Exception { 533 if (isSyncDataSet()) { // For Sync Dataset 534 return coord_currentRange_sync(start, end); 535 } 536 else { 537 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 538 } 539 } 540 /** 541 * Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p/> It 542 * depends on: <p> 1. Data set frequency <p/> 2. Data set Time Unit <p/> 3. Data set Time zone/DST 543 * <p/> 4. Data set initial instance <p/> 5. Action Creation Time 544 * 545 * @param n offset amount (integer) 546 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 547 * @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time 548 * @throws Exception if there was a problem formatting 549 */ 550 public static String ph2_coord_offset(int n, String timeUnit) throws Exception { 551 if (isSyncDataSet()) { // For Sync Dataset 552 return coord_offset_sync(n, timeUnit); 553 } 554 else { 555 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 556 } 557 } 558 559 /** 560 * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency 561 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. 562 * Data set initial instance <p/> 6. Action Creation Time 563 * 564 * @param n instance count <p/> domain: n is integer 565 * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS 566 * @throws Exception 567 */ 568 public static int ph2_coord_hoursInDay(int n) throws Exception { 569 int datasetFrequency = (int) getDSFrequency(); 570 // /Calendar nominalInstanceCal = 571 // getCurrentInstance(getActionCreationtime()); 572 Calendar nominalInstanceCal = getEffectiveNominalTime(); 573 if (nominalInstanceCal == null) { 574 return -1; 575 } 576 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 577 /* 578 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 579 * { return -1; } 580 */ 581 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 582 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 583 return DateUtils.hoursInDay(nominalInstanceCal); 584 } 585 586 public static int ph3_coord_hoursInDay(int n) throws Exception { 587 return ph2_coord_hoursInDay(n); 588 } 589 590 /** 591 * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency . 592 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. 593 * Data set initial instance <p/> 6. Action Creation Time 594 * 595 * @param n instance count. domain: n is integer 596 * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS 597 * @throws Exception 598 */ 599 public static int ph2_coord_daysInMonth(int n) throws Exception { 600 int datasetFrequency = (int) getDSFrequency();// in minutes 601 // Calendar nominalInstanceCal = 602 // getCurrentInstance(getActionCreationtime()); 603 Calendar nominalInstanceCal = getEffectiveNominalTime(); 604 if (nominalInstanceCal == null) { 605 return -1; 606 } 607 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 608 /* 609 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 610 * { return -1; } 611 */ 612 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 613 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 614 return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH); 615 } 616 617 public static int ph3_coord_daysInMonth(int n) throws Exception { 618 return ph2_coord_daysInMonth(n); 619 } 620 621 /** 622 * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p/> It depends 623 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST 624 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of 625 * dataset's directory 626 * 627 * @param n :instance count <p/> domain: n <= 0, n is integer 628 * @return date-time in Oozie processing timezone of the n-th instance <p/> returns 'null' means n-th instance is 629 * earlier than Initial-Instance of DS 630 * @throws Exception 631 */ 632 public static String ph3_coord_latest(int n) throws Exception { 633 ParamChecker.checkLEZero(n, "latest:n"); 634 if (isSyncDataSet()) {// For Sync Dataset 635 return coord_latest_sync(n); 636 } 637 else { 638 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 639 } 640 } 641 642 /** 643 * Determine the date-time in Oozie processing timezone of latest available dataset instances 644 * from start to end offsets from the nominal time. <p/> It depends 645 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST 646 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of 647 * dataset's directory 648 * 649 * @param start :start instance offset <p/> domain: start <= 0, start is integer 650 * @param end :end instance offset <p/> domain: end <= 0, end is integer 651 * @return date-time in Oozie processing timezone of the instances from start to end offsets 652 * delimited by comma. <p/> returns 'null' means start offset instance is 653 * earlier than Initial-Instance of DS 654 * @throws Exception 655 */ 656 public static String ph3_coord_latestRange(int start, int end) throws Exception { 657 ParamChecker.checkLEZero(start, "latest:n"); 658 ParamChecker.checkLEZero(end, "latest:n"); 659 if (isSyncDataSet()) {// For Sync Dataset 660 return coord_latestRange_sync(start, end); 661 } 662 else { 663 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 664 } 665 } 666 667 /** 668 * Configure an evaluator with data set and application specific information. <p/> Helper method of associating 669 * dataset and application object 670 * 671 * @param evaluator : to set variables 672 * @param ds : Data Set object 673 * @param coordAction : Application instance 674 */ 675 public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) { 676 evaluator.setVariable(COORD_ACTION, coordAction); 677 evaluator.setVariable(DATASET, ds); 678 } 679 680 /** 681 * Helper method to wrap around with "${..}". <p/> 682 * 683 * 684 * @param eval :EL evaluator 685 * @param expr : expression to evaluate 686 * @return Resolved expression or echo back the same expression 687 * @throws Exception 688 */ 689 public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception { 690 try { 691 eval.setVariable(".wrap", null); 692 String result = eval.evaluate(expr, String.class); 693 if (eval.getVariable(".wrap") != null) { 694 return "${" + result + "}"; 695 } 696 else { 697 return result; 698 } 699 } 700 catch (Exception e) { 701 throw new Exception("Unable to evaluate :" + expr + ":\n", e); 702 } 703 } 704 705 // Set of echo functions 706 707 public static String ph1_coord_current_echo(String n) { 708 return echoUnResolved("current", n); 709 } 710 711 public static String ph1_coord_absolute_echo(String date) { 712 return echoUnResolved("absolute", date); 713 } 714 715 public static String ph1_coord_currentRange_echo(String start, String end) { 716 return echoUnResolved("currentRange", start + ", " + end); 717 } 718 719 public static String ph1_coord_offset_echo(String n, String timeUnit) { 720 return echoUnResolved("offset", n + " , " + timeUnit); 721 } 722 723 public static String ph2_coord_current_echo(String n) { 724 return echoUnResolved("current", n); 725 } 726 727 public static String ph2_coord_currentRange_echo(String start, String end) { 728 return echoUnResolved("currentRange", start + ", " + end); 729 } 730 731 public static String ph2_coord_offset_echo(String n, String timeUnit) { 732 return echoUnResolved("offset", n + " , " + timeUnit); 733 } 734 735 public static String ph2_coord_absolute_echo(String date) { 736 return echoUnResolved("absolute", date); 737 } 738 739 public static String ph2_coord_absolute_range(String startInstance, int end) throws Exception { 740 int[] instanceCount = new int[1]; 741 742 // getCurrentInstance() returns null, which means startInstance is less 743 // than initial instance 744 if (getCurrentInstance(DateUtils.getCalendar(startInstance).getTime(), instanceCount) == null) { 745 throw new CommandException(ErrorCode.E1010, 746 "intial-instance should be equal or earlier than the start-instance. intial-instance is " 747 + getInitialInstance() + " and start-instance is " + startInstance); 748 } 749 int[] nominalCount = new int[1]; 750 if (getCurrentInstance(getActionCreationtime(), nominalCount) == null) { 751 throw new CommandException(ErrorCode.E1010, 752 "intial-instance should be equal or earlier than the nominal time. intial-instance is " 753 + getInitialInstance() + " and nominal time is " + getActionCreationtime()); 754 } 755 // getCurrentInstance return offset relative to initial instance. 756 // start instance offset - nominal offset = start offset relative to 757 // nominal time-stamp. 758 int start = instanceCount[0] - nominalCount[0]; 759 if (start > end) { 760 throw new CommandException(ErrorCode.E1010, 761 "start-instance should be equal or earlier than the end-instance. startInstance is " 762 + startInstance + " which is equivalent to current (" + instanceCount[0] 763 + ") but end is specified as current (" + end + ")"); 764 } 765 return ph2_coord_currentRange(start, end); 766 } 767 768 public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) { 769 return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit); 770 } 771 772 public static String ph1_coord_formatTime_echo(String dateTime, String format) { 773 // Quote the dateTime value since it would contain a ':'. 774 return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format); 775 } 776 777 public static String ph1_coord_latest_echo(String n) { 778 return echoUnResolved("latest", n); 779 } 780 781 public static String ph2_coord_latest_echo(String n) { 782 return ph1_coord_latest_echo(n); 783 } 784 785 public static String ph1_coord_future_echo(String n, String instance) { 786 return echoUnResolved("future", n + ", " + instance + ""); 787 } 788 789 public static String ph2_coord_future_echo(String n, String instance) { 790 return ph1_coord_future_echo(n, instance); 791 } 792 793 public static String ph1_coord_latestRange_echo(String start, String end) { 794 return echoUnResolved("latestRange", start + ", " + end); 795 } 796 797 public static String ph2_coord_latestRange_echo(String start, String end) { 798 return ph1_coord_latestRange_echo(start, end); 799 } 800 801 public static String ph1_coord_futureRange_echo(String start, String end, String instance) { 802 return echoUnResolved("futureRange", start + ", " + end + ", " + instance); 803 } 804 805 public static String ph2_coord_futureRange_echo(String start, String end, String instance) { 806 return ph1_coord_futureRange_echo(start, end, instance); 807 } 808 809 public static String ph1_coord_dataIn_echo(String n) { 810 ELEvaluator eval = ELEvaluator.getCurrent(); 811 String val = (String) eval.getVariable("oozie.dataname." + n); 812 if (val == null || val.equals("data-in") == false) { 813 XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid"); 814 throw new RuntimeException("data_in_name " + n + " is not valid"); 815 } 816 return echoUnResolved("dataIn", "'" + n + "'"); 817 } 818 819 public static String ph1_coord_dataOut_echo(String n) { 820 ELEvaluator eval = ELEvaluator.getCurrent(); 821 String val = (String) eval.getVariable("oozie.dataname." + n); 822 if (val == null || val.equals("data-out") == false) { 823 XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid"); 824 throw new RuntimeException("data_out_name " + n + " is not valid"); 825 } 826 return echoUnResolved("dataOut", "'" + n + "'"); 827 } 828 829 public static String ph1_coord_nominalTime_echo() { 830 return echoUnResolved("nominalTime", ""); 831 } 832 833 public static String ph1_coord_nominalTime_echo_wrap() { 834 // return "${coord:nominalTime()}"; // no resolution 835 return echoUnResolved("nominalTime", ""); 836 } 837 838 public static String ph1_coord_nominalTime_echo_fixed() { 839 return "2009-03-06T010:00"; // Dummy resolution 840 } 841 842 public static String ph1_coord_actualTime_echo_wrap() { 843 // return "${coord:actualTime()}"; // no resolution 844 return echoUnResolved("actualTime", ""); 845 } 846 847 public static String ph1_coord_actionId_echo() { 848 return echoUnResolved("actionId", ""); 849 } 850 851 public static String ph1_coord_name_echo() { 852 return echoUnResolved("name", ""); 853 } 854 855 // The following echo functions are not used in any phases yet 856 // They are here for future purpose. 857 public static String coord_minutes_echo(String n) { 858 return echoUnResolved("minutes", n); 859 } 860 861 public static String coord_hours_echo(String n) { 862 return echoUnResolved("hours", n); 863 } 864 865 public static String coord_days_echo(String n) { 866 return echoUnResolved("days", n); 867 } 868 869 public static String coord_endOfDay_echo(String n) { 870 return echoUnResolved("endOfDay", n); 871 } 872 873 public static String coord_months_echo(String n) { 874 return echoUnResolved("months", n); 875 } 876 877 public static String coord_endOfMonth_echo(String n) { 878 return echoUnResolved("endOfMonth", n); 879 } 880 881 public static String coord_actualTime_echo() { 882 return echoUnResolved("actualTime", ""); 883 } 884 885 // This echo function will always return "24" for validation only. 886 // This evaluation ****should not**** replace the original XML 887 // Create a temporary string and validate the function 888 // This is **required** for evaluating an expression like 889 // coord:HoursInDay(0) + 3 890 // actual evaluation will happen in phase 2 or phase 3. 891 public static String ph1_coord_hoursInDay_echo(String n) { 892 return "24"; 893 // return echoUnResolved("hoursInDay", n); 894 } 895 896 // This echo function will always return "30" for validation only. 897 // This evaluation ****should not**** replace the original XML 898 // Create a temporary string and validate the function 899 // This is **required** for evaluating an expression like 900 // coord:daysInMonth(0) + 3 901 // actual evaluation will happen in phase 2 or phase 3. 902 public static String ph1_coord_daysInMonth_echo(String n) { 903 // return echoUnResolved("daysInMonth", n); 904 return "30"; 905 } 906 907 // This echo function will always return "3" for validation only. 908 // This evaluation ****should not**** replace the original XML 909 // Create a temporary string and validate the function 910 // This is **required** for evaluating an expression like coord:tzOffset + 2 911 // actual evaluation will happen in phase 2 or phase 3. 912 public static String ph1_coord_tzOffset_echo() { 913 // return echoUnResolved("tzOffset", ""); 914 return "3"; 915 } 916 917 // Local methods 918 /** 919 * @param n 920 * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the 921 * Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset. 922 * @throws Exception 923 */ 924 private static String coord_current_sync(int n) throws Exception { 925 return coord_currentRange_sync(n, n); 926 } 927 928 private static String coord_currentRange_sync(int start, int end) throws Exception { 929 final XLog LOG = XLog.getLog(CoordELFunctions.class); 930 int datasetFrequency = getDSFrequency();// in minutes 931 TimeUnit dsTimeUnit = getDSTimeUnit(); 932 int[] instCount = new int[1];// used as pass by ref 933 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 934 if (nominalInstanceCal == null) { 935 LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is" 936 + " returned. This means that no data is available at the current-instance specified by the user" 937 + " and the user could try modifying his initial-instance to an earlier time."); 938 return ""; 939 } else { 940 Calendar initInstance = getInitialInstanceCal(); 941 // Add in the reverse order - newest instance first. 942 nominalInstanceCal = (Calendar) initInstance.clone(); 943 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), (instCount[0] + start) * datasetFrequency); 944 List<String> instances = new ArrayList<String>(); 945 for (int i = start; i <= end; i++) { 946 if (nominalInstanceCal.compareTo(initInstance) < 0) { 947 LOG.warn("If the initial instance of the dataset is later than the current-instance specified," 948 + " such as coord:current({0}) in this case, an empty string is returned. This means that" 949 + " no data is available at the current-instance specified by the user and the user could" 950 + " try modifying his initial-instance to an earlier time.", start); 951 } 952 else { 953 instances.add(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 954 } 955 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency); 956 } 957 instances = Lists.reverse(instances); 958 return StringUtils.join(instances, CoordELFunctions.INSTANCE_SEPARATOR); 959 } 960 } 961 962 /** 963 * 964 * @param n offset amount (integer) 965 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 966 * @return the offset time from the effective nominal time <p/> return empty string ("") if the Action_Creation_time or the 967 * offset instance <p/> is earlier than the Initial_Instance of dataset. 968 * @throws Exception 969 */ 970 private static String coord_offset_sync(int n, String timeUnit) throws Exception { 971 Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null); 972 if (rawCal == null) { 973 // warning already logged by resolveOffsetRawTime() 974 return ""; 975 } 976 977 int freq = getDSFrequency(); 978 TimeUnit freqUnit = getDSTimeUnit(); 979 int freqCount = 0; 980 // We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same 981 // time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time 982 // In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true 983 // If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that. 984 Calendar cal = getInitialInstanceCal(); 985 if (rawCal.before(cal)) { 986 while (cal.after(rawCal)) { 987 cal.add(freqUnit.getCalendarUnit(), -freq); 988 freqCount--; 989 } 990 } 991 else if (rawCal.after(cal)) { 992 while (cal.before(rawCal)) { 993 cal.add(freqUnit.getCalendarUnit(), freq); 994 freqCount++; 995 } 996 } 997 if (cal.before(rawCal)) { 998 rawCal = cal; 999 } 1000 else if (cal.after(rawCal)) { 1001 cal.add(freqUnit.getCalendarUnit(), -freq); 1002 rawCal = cal; 1003 freqCount--; 1004 } 1005 String rawCalStr = DateUtils.formatDateOozieTZ(rawCal); 1006 1007 Calendar nominalInstanceCal = getInitialInstanceCal(); 1008 nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount); 1009 if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) { 1010 XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the offset instance" 1011 + " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no" 1012 + " data is available at the offset instance specified by the user and the user could try modifying his" 1013 + " initial-instance to an earlier time.", n, timeUnit); 1014 return ""; 1015 } 1016 String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal); 1017 1018 if (!rawCalStr.equals(nominalCalStr)) { 1019 throw new RuntimeException("Shouldn't happen"); 1020 } 1021 return rawCalStr; 1022 } 1023 1024 /** 1025 * @param offset 1026 * @return n-th available latest instance Date-Time for SYNC data-set 1027 * @throws Exception 1028 */ 1029 private static String coord_latest_sync(int offset) throws Exception { 1030 return coord_latestRange_sync(offset, offset); 1031 } 1032 1033 private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception { 1034 final XLog LOG = XLog.getLog(CoordELFunctions.class); 1035 final Thread currentThread = Thread.currentThread(); 1036 ELEvaluator eval = ELEvaluator.getCurrent(); 1037 String retVal = ""; 1038 int datasetFrequency = (int) getDSFrequency();// in minutes 1039 TimeUnit dsTimeUnit = getDSTimeUnit(); 1040 int[] instCount = new int[1]; 1041 boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false); 1042 Calendar nominalInstanceCal; 1043 if (useCurrentTime) { 1044 nominalInstanceCal = getCurrentInstance(new Date(), instCount); 1045 } 1046 else { 1047 nominalInstanceCal = getCurrentInstance(getActualTime(), instCount); 1048 } 1049 StringBuilder resolvedInstances = new StringBuilder(); 1050 StringBuilder resolvedURIPaths = new StringBuilder(); 1051 if (nominalInstanceCal != null) { 1052 Calendar initInstance = getInitialInstanceCal(); 1053 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1054 if (ds == null) { 1055 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1056 } 1057 String uriTemplate = ds.getUriTemplate(); 1058 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 1059 if (conf == null) { 1060 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 1061 } 1062 int available = 0; 1063 boolean resolved = false; 1064 String user = ParamChecker 1065 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 1066 String doneFlag = ds.getDoneFlag(); 1067 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 1068 URIHandler uriHandler = null; 1069 Context uriContext = null; 1070 try { 1071 while (nominalInstanceCal.compareTo(initInstance) >= 0 && !currentThread.isInterrupted()) { 1072 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 1073 String uriPath = uriEval.evaluate(uriTemplate, String.class); 1074 if (uriHandler == null) { 1075 URI uri = new URI(uriPath); 1076 uriHandler = uriService.getURIHandler(uri); 1077 uriContext = uriHandler.getContext(uri, conf, user); 1078 } 1079 String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); 1080 if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) { 1081 XLog.getLog(CoordELFunctions.class) 1082 .debug("Found latest(" + available + "): " + uriWithDoneFlag); 1083 if (available == startOffset) { 1084 LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag); 1085 resolved = true; 1086 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 1087 resolvedURIPaths.append(uriPath); 1088 retVal = resolvedInstances.toString(); 1089 eval.setVariable("resolved_path", resolvedURIPaths.toString()); 1090 break; 1091 } 1092 else if (available <= endOffset) { 1093 LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag); 1094 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append( 1095 INSTANCE_SEPARATOR); 1096 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); 1097 } 1098 1099 available--; 1100 } 1101 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency); 1102 nominalInstanceCal = (Calendar) initInstance.clone(); 1103 instCount[0]--; 1104 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 1105 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 1106 } 1107 } 1108 finally { 1109 if (uriContext != null) { 1110 uriContext.destroy(); 1111 } 1112 } 1113 if (!resolved) { 1114 // return unchanged latest function with variable 'is_resolved' 1115 // to 'false' 1116 eval.setVariable("is_resolved", Boolean.FALSE); 1117 if (startOffset == endOffset) { 1118 retVal = "${coord:latest(" + startOffset + ")}"; 1119 } 1120 else { 1121 retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}"; 1122 } 1123 } 1124 else { 1125 eval.setVariable("is_resolved", Boolean.TRUE); 1126 } 1127 } 1128 else {// No feasible nominal time 1129 eval.setVariable("is_resolved", Boolean.FALSE); 1130 } 1131 return retVal; 1132 } 1133 1134 /** 1135 * @param tm 1136 * @return a new Evaluator to be used for URI-template evaluation 1137 */ 1138 private static ELEvaluator getUriEvaluator(Calendar tm) { 1139 tm.setTimeZone(DateUtils.getOozieProcessingTimeZone()); 1140 ELEvaluator retEval = new ELEvaluator(); 1141 retEval.setVariable("YEAR", tm.get(Calendar.YEAR)); 1142 retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm 1143 .get(Calendar.MONTH) + 1)); 1144 retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm 1145 .get(Calendar.DAY_OF_MONTH)); 1146 retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm 1147 .get(Calendar.HOUR_OF_DAY)); 1148 retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm 1149 .get(Calendar.MINUTE)); 1150 return retEval; 1151 } 1152 1153 /** 1154 * @return whether a data set is SYNCH or ASYNC 1155 */ 1156 private static boolean isSyncDataSet() { 1157 ELEvaluator eval = ELEvaluator.getCurrent(); 1158 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1159 if (ds == null) { 1160 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1161 } 1162 return ds.getType().equalsIgnoreCase("SYNC"); 1163 } 1164 1165 /** 1166 * Check whether a function should be resolved. 1167 * 1168 * @param functionName 1169 * @param n 1170 * @return null if the functionName needs to be resolved otherwise return the calling function unresolved. 1171 */ 1172 private static String checkIfResolved(String functionName, String n) { 1173 ELEvaluator eval = ELEvaluator.getCurrent(); 1174 String replace = (String) eval.getVariable("resolve_" + functionName); 1175 if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't 1176 // resolve 1177 // return "${coord:" + functionName + "(" + n +")}"; //Unresolved 1178 eval.setVariable(".wrap", "true"); 1179 return "coord:" + functionName + "(" + n + ")"; // Unresolved 1180 } 1181 return null; // Resolved it 1182 } 1183 1184 private static String echoUnResolved(String functionName, String n) { 1185 return echoUnResolvedPre(functionName, n, "coord:"); 1186 } 1187 1188 private static String echoUnResolvedPre(String functionName, String n, String prefix) { 1189 ELEvaluator eval = ELEvaluator.getCurrent(); 1190 eval.setVariable(".wrap", "true"); 1191 return prefix + functionName + "(" + n + ")"; // Unresolved 1192 } 1193 1194 /** 1195 * @return the initial instance of a DataSet in DATE 1196 */ 1197 private static Date getInitialInstance() { 1198 ELEvaluator eval = ELEvaluator.getCurrent(); 1199 return getInitialInstance(eval); 1200 } 1201 1202 /** 1203 * @return the initial instance of a DataSet in DATE 1204 */ 1205 private static Date getInitialInstance(ELEvaluator eval) { 1206 return getInitialInstanceCal(eval).getTime(); 1207 // return ds.getInitInstance(); 1208 } 1209 1210 /** 1211 * @return the initial instance of a DataSet in Calendar 1212 */ 1213 private static Calendar getInitialInstanceCal() { 1214 ELEvaluator eval = ELEvaluator.getCurrent(); 1215 return getInitialInstanceCal(eval); 1216 } 1217 1218 /** 1219 * @return the initial instance of a DataSet in Calendar 1220 */ 1221 private static Calendar getInitialInstanceCal(ELEvaluator eval) { 1222 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1223 if (ds == null) { 1224 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1225 } 1226 Calendar effInitTS = new GregorianCalendar(ds.getTimeZone()); 1227 effInitTS.setTime(ds.getInitInstance()); 1228 // To adjust EOD/EOM 1229 DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval)); 1230 return effInitTS; 1231 // return ds.getInitInstance(); 1232 } 1233 1234 /** 1235 * @return Nominal or action creation Time when all the dependencies of an application instance are met. 1236 */ 1237 private static Date getActionCreationtime() { 1238 ELEvaluator eval = ELEvaluator.getCurrent(); 1239 return getActionCreationtime(eval); 1240 } 1241 1242 /** 1243 * @return Nominal or action creation Time when all the dependencies of an application instance are met. 1244 */ 1245 private static Date getActionCreationtime(ELEvaluator eval) { 1246 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1247 if (coordAction == null) { 1248 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1249 } 1250 return coordAction.getNominalTime(); 1251 } 1252 1253 /** 1254 * @return Actual Time when all the dependencies of an application instance are met. 1255 */ 1256 private static Date getActualTime() { 1257 ELEvaluator eval = ELEvaluator.getCurrent(); 1258 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1259 if (coordAction == null) { 1260 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1261 } 1262 return coordAction.getActualTime(); 1263 } 1264 1265 /** 1266 * @return TimeZone for the application or job. 1267 */ 1268 private static TimeZone getJobTZ() { 1269 ELEvaluator eval = ELEvaluator.getCurrent(); 1270 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1271 if (coordAction == null) { 1272 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1273 } 1274 return coordAction.getTimeZone(); 1275 } 1276 1277 /** 1278 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 1279 * 1280 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 1281 * the dataset. 1282 */ 1283 public static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) { 1284 ELEvaluator eval = ELEvaluator.getCurrent(); 1285 return getCurrentInstance(effectiveTime, instanceCount, eval); 1286 } 1287 1288 /** 1289 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 1290 * 1291 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 1292 * the dataset. 1293 */ 1294 private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) { 1295 Date datasetInitialInstance = getInitialInstance(eval); 1296 TimeUnit dsTimeUnit = getDSTimeUnit(eval); 1297 TimeZone dsTZ = getDatasetTZ(eval); 1298 int dsFreq = getDSFrequency(eval); 1299 // Convert Date to Calendar for corresponding TZ 1300 Calendar current = Calendar.getInstance(dsTZ); 1301 current.setTime(datasetInitialInstance); 1302 1303 Calendar calEffectiveTime = new GregorianCalendar(dsTZ); 1304 calEffectiveTime.setTime(effectiveTime); 1305 if (instanceCount == null) { // caller doesn't care about this value 1306 instanceCount = new int[1]; 1307 } 1308 instanceCount[0] = 0; 1309 if (current.compareTo(calEffectiveTime) > 0) { 1310 return null; 1311 } 1312 1313 switch(dsTimeUnit) { 1314 case MINUTE: 1315 instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MINUTE_MSEC); 1316 break; 1317 case HOUR: 1318 instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / HOUR_MSEC); 1319 break; 1320 case DAY: 1321 case END_OF_DAY: 1322 instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / DAY_MSEC); 1323 break; 1324 case MONTH: 1325 case END_OF_MONTH: 1326 instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MONTH_MSEC); 1327 break; 1328 case YEAR: 1329 instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / YEAR_MSEC); 1330 break; 1331 default: 1332 throw new IllegalArgumentException("Unhandled dataset time unit " + dsTimeUnit); 1333 } 1334 1335 if (instanceCount[0] > 2) { 1336 instanceCount[0] = (instanceCount[0] / dsFreq); 1337 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); 1338 } else { 1339 instanceCount[0] = 0; 1340 } 1341 while (!current.getTime().after(effectiveTime)) { 1342 current.add(dsTimeUnit.getCalendarUnit(), dsFreq); 1343 instanceCount[0]++; 1344 } 1345 current.add(dsTimeUnit.getCalendarUnit(), -dsFreq); 1346 instanceCount[0]--; 1347 return current; 1348 } 1349 1350 /** 1351 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 1352 * 1353 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 1354 * the dataset. 1355 */ 1356 private static Calendar getCurrentInstance_old(Date effectiveTime, int instanceCount[], ELEvaluator eval) { 1357 Date datasetInitialInstance = getInitialInstance(eval); 1358 TimeUnit dsTimeUnit = getDSTimeUnit(eval); 1359 TimeZone dsTZ = getDatasetTZ(eval); 1360 int dsFreq = getDSFrequency(eval); 1361 // Convert Date to Calendar for corresponding TZ 1362 Calendar current = Calendar.getInstance(); 1363 current.setTime(datasetInitialInstance); 1364 current.setTimeZone(dsTZ); 1365 1366 Calendar calEffectiveTime = Calendar.getInstance(); 1367 calEffectiveTime.setTime(effectiveTime); 1368 calEffectiveTime.setTimeZone(dsTZ); 1369 if (instanceCount == null) { // caller doesn't care about this value 1370 instanceCount = new int[1]; 1371 } 1372 instanceCount[0] = 0; 1373 if (current.compareTo(calEffectiveTime) > 0) { 1374 return null; 1375 } 1376 Calendar origCurrent = (Calendar) current.clone(); 1377 while (current.compareTo(calEffectiveTime) <= 0) { 1378 current = (Calendar) origCurrent.clone(); 1379 instanceCount[0]++; 1380 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); 1381 } 1382 instanceCount[0]--; 1383 1384 current = (Calendar) origCurrent.clone(); 1385 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); 1386 return current; 1387 } 1388 1389 public static Calendar getEffectiveNominalTime() { 1390 Date datasetInitialInstance = getInitialInstance(); 1391 TimeZone dsTZ = getDatasetTZ(); 1392 // Convert Date to Calendar for corresponding TZ 1393 Calendar current = Calendar.getInstance(); 1394 current.setTime(datasetInitialInstance); 1395 current.setTimeZone(dsTZ); 1396 1397 Calendar calEffectiveTime = Calendar.getInstance(); 1398 calEffectiveTime.setTime(getActionCreationtime()); 1399 calEffectiveTime.setTimeZone(dsTZ); 1400 if (current.compareTo(calEffectiveTime) > 0) { 1401 // Nominal Time < initial Instance 1402 // TODO: getClass() call doesn't work from static method. 1403 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+ 1404 // current.getTime()); 1405 return null; 1406 } 1407 return calEffectiveTime; 1408 } 1409 1410 /** 1411 * @return dataset frequency in minutes 1412 */ 1413 private static int getDSFrequency() { 1414 ELEvaluator eval = ELEvaluator.getCurrent(); 1415 return getDSFrequency(eval); 1416 } 1417 1418 /** 1419 * @return dataset frequency in minutes 1420 */ 1421 private static int getDSFrequency(ELEvaluator eval) { 1422 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1423 if (ds == null) { 1424 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1425 } 1426 return ds.getFrequency(); 1427 } 1428 1429 /** 1430 * @return dataset TimeUnit 1431 */ 1432 private static TimeUnit getDSTimeUnit() { 1433 ELEvaluator eval = ELEvaluator.getCurrent(); 1434 return getDSTimeUnit(eval); 1435 } 1436 1437 /** 1438 * @return dataset TimeUnit 1439 */ 1440 private static TimeUnit getDSTimeUnit(ELEvaluator eval) { 1441 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1442 if (ds == null) { 1443 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1444 } 1445 return ds.getTimeUnit(); 1446 } 1447 1448 /** 1449 * @return dataset TimeZone 1450 */ 1451 public static TimeZone getDatasetTZ() { 1452 ELEvaluator eval = ELEvaluator.getCurrent(); 1453 return getDatasetTZ(eval); 1454 } 1455 1456 /** 1457 * @return dataset TimeZone 1458 */ 1459 private static TimeZone getDatasetTZ(ELEvaluator eval) { 1460 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1461 if (ds == null) { 1462 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1463 } 1464 return ds.getTimeZone(); 1465 } 1466 1467 /** 1468 * @return dataset TimeUnit 1469 */ 1470 private static TimeUnit getDSEndOfFlag() { 1471 ELEvaluator eval = ELEvaluator.getCurrent(); 1472 return getDSEndOfFlag(eval); 1473 } 1474 1475 /** 1476 * @return dataset TimeUnit 1477 */ 1478 private static TimeUnit getDSEndOfFlag(ELEvaluator eval) { 1479 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1480 if (ds == null) { 1481 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1482 } 1483 return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration(); 1484 } 1485 1486 /** 1487 * Return a job configuration property for the coordinator. 1488 * 1489 * @param property property name. 1490 * @return the value of the property, <code>null</code> if the property is undefined. 1491 */ 1492 public static String coord_conf(String property) { 1493 ELEvaluator eval = ELEvaluator.getCurrent(); 1494 return (String) eval.getVariable(property); 1495 } 1496 1497 /** 1498 * Return the user that submitted the coordinator job. 1499 * 1500 * @return the user that submitted the coordinator job. 1501 */ 1502 public static String coord_user() { 1503 ELEvaluator eval = ELEvaluator.getCurrent(); 1504 return (String) eval.getVariable(OozieClient.USER_NAME); 1505 } 1506 1507 /** 1508 * Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur 1509 * between them. The caller should make sure that startCal is earlier than endCal. 1510 * <p> 1511 * As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal 1512 * time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60 1513 * 1514 * @param startCal The earlier offset time 1515 * @param endCal The later offset time 1516 * @param eval The ELEvaluator to use; cannot be null 1517 * @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal 1518 */ 1519 public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) { 1520 List<Integer> expandedFreqs = new ArrayList<Integer>(); 1521 // Use eval because the "current" eval isn't set 1522 int freq = getDSFrequency(eval); 1523 TimeUnit freqUnit = getDSTimeUnit(eval); 1524 Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval); 1525 int totalFreq = 0; 1526 if (startCal.before(cal)) { 1527 while (cal.after(startCal)) { 1528 cal.add(freqUnit.getCalendarUnit(), -freq); 1529 totalFreq += -freq; 1530 } 1531 if (cal.before(startCal)) { 1532 cal.add(freqUnit.getCalendarUnit(), freq); 1533 totalFreq += freq; 1534 } 1535 } 1536 else if (startCal.after(cal)) { 1537 while (cal.before(startCal)) { 1538 cal.add(freqUnit.getCalendarUnit(), freq); 1539 totalFreq += freq; 1540 } 1541 } 1542 // At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the 1543 // effective nominal time. Now we can find all of the instances that occur between startCal and endCal, inclusive. 1544 while (cal.before(endCal) || cal.equals(endCal)) { 1545 expandedFreqs.add(totalFreq); 1546 cal.add(freqUnit.getCalendarUnit(), freq); 1547 totalFreq += freq; 1548 } 1549 return expandedFreqs; 1550 } 1551 1552 /** 1553 * Resolve the offset time from the effective nominal time 1554 * 1555 * @param n offset amount (integer) 1556 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 1557 * @param eval The ELEvaluator to use; or null to use the "current" eval 1558 * @return A Calendar of the offset time 1559 */ 1560 public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) { 1561 // Use eval if given (for when the "current" eval isn't set) 1562 Calendar cal; 1563 if (eval == null) { 1564 cal = getCurrentInstance(getActionCreationtime(), null); 1565 } 1566 else { 1567 cal = getCurrentInstance(getActionCreationtime(eval), null, eval); 1568 } 1569 if (cal == null) { 1570 XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the nominal time, an" 1571 + " empty string is returned. This means that no data is available at the offset instance specified by the user" 1572 + " and the user could try modifying his or her initial-instance to an earlier time."); 1573 return null; 1574 } 1575 cal.add(timeUnit.getCalendarUnit(), n); 1576 return cal; 1577 } 1578}