001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.coord; 019 020 import java.io.IOException; 021 import java.util.Calendar; 022 import java.util.Date; 023 import java.util.TimeZone; 024 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.fs.Path; 027 028 import org.apache.oozie.client.OozieClient; 029 import org.apache.oozie.util.DateUtils; 030 import org.apache.oozie.util.ELEvaluator; 031 import org.apache.oozie.util.ParamChecker; 032 import org.apache.oozie.util.XLog; 033 import org.apache.oozie.service.HadoopAccessorException; 034 import org.apache.oozie.service.Services; 035 import org.apache.oozie.service.HadoopAccessorService; 036 037 /** 038 * This class implements the EL function related to coordinator 039 */ 040 041 public class CoordELFunctions { 042 final private static String DATASET = "oozie.coord.el.dataset.bean"; 043 final private static String COORD_ACTION = "oozie.coord.el.app.bean"; 044 final public static String CONFIGURATION = "oozie.coord.el.conf"; 045 // INSTANCE_SEPARATOR is used to separate multiple directories into one tag. 046 final public static String INSTANCE_SEPARATOR = "#"; 047 final public static String DIR_SEPARATOR = ","; 048 // TODO: in next release, support flexibility 049 private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS"; 050 051 /** 052 * Used in defining the frequency in 'day' unit. <p/> domain: <code> val > 0</code> and should be integer. 053 * 054 * @param val frequency in number of days. 055 * @return number of days and also set the frequency timeunit to "day" 056 */ 057 public static int ph1_coord_days(int val) { 058 val = ParamChecker.checkGTZero(val, "n"); 059 ELEvaluator eval = ELEvaluator.getCurrent(); 060 eval.setVariable("timeunit", TimeUnit.DAY); 061 eval.setVariable("endOfDuration", TimeUnit.NONE); 062 return val; 063 } 064 065 /** 066 * Used in defining the frequency in 'month' unit. <p/> domain: <code> val > 0</code> and should be integer. 067 * 068 * @param val frequency in number of months. 069 * @return number of months and also set the frequency timeunit to "month" 070 */ 071 public static int ph1_coord_months(int val) { 072 val = ParamChecker.checkGTZero(val, "n"); 073 ELEvaluator eval = ELEvaluator.getCurrent(); 074 eval.setVariable("timeunit", TimeUnit.MONTH); 075 eval.setVariable("endOfDuration", TimeUnit.NONE); 076 return val; 077 } 078 079 /** 080 * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val > 0</code> and should 081 * be integer. 082 * 083 * @param val frequency in number of hours. 084 * @return number of minutes and also set the frequency timeunit to "minute" 085 */ 086 public static int ph1_coord_hours(int val) { 087 val = ParamChecker.checkGTZero(val, "n"); 088 ELEvaluator eval = ELEvaluator.getCurrent(); 089 eval.setVariable("timeunit", TimeUnit.MINUTE); 090 eval.setVariable("endOfDuration", TimeUnit.NONE); 091 return val * 60; 092 } 093 094 /** 095 * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val > 0</code> and should be integer. 096 * 097 * @param val frequency in number of minutes. 098 * @return number of minutes and also set the frequency timeunit to "minute" 099 */ 100 public static int ph1_coord_minutes(int val) { 101 val = ParamChecker.checkGTZero(val, "n"); 102 ELEvaluator eval = ELEvaluator.getCurrent(); 103 eval.setVariable("timeunit", TimeUnit.MINUTE); 104 eval.setVariable("endOfDuration", TimeUnit.NONE); 105 return val; 106 } 107 108 /** 109 * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will 110 * start at 00:00 hour of each day. <p/> domain: <code> val > 0</code> and should be integer. 111 * 112 * @param val frequency in number of days. 113 * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day" 114 */ 115 public static int ph1_coord_endOfDays(int val) { 116 val = ParamChecker.checkGTZero(val, "n"); 117 ELEvaluator eval = ELEvaluator.getCurrent(); 118 eval.setVariable("timeunit", TimeUnit.DAY); 119 eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY); 120 return val; 121 } 122 123 /** 124 * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will 125 * start at first day of each month at 00:00 hour. <p/> domain: <code> val > 0</code> and should be integer. 126 * 127 * @param val: frequency in number of months. 128 * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month" 129 */ 130 public static int ph1_coord_endOfMonths(int val) { 131 val = ParamChecker.checkGTZero(val, "n"); 132 ELEvaluator eval = ELEvaluator.getCurrent(); 133 eval.setVariable("timeunit", TimeUnit.MONTH); 134 eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH); 135 return val; 136 } 137 138 /** 139 * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/> 140 * 1. Timezone of both dataset and job <p/> 2. Action creation Time 141 * 142 * @return difference in minutes (DataSet TZ Offset - Application TZ offset) 143 */ 144 public static int ph2_coord_tzOffset() { 145 Date actionCreationTime = getActionCreationtime(); 146 TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ"); 147 TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ"); 148 // Apply the TZ into Calendar object 149 Calendar dsTime = Calendar.getInstance(dsTZ); 150 dsTime.setTime(actionCreationTime); 151 Calendar jobTime = Calendar.getInstance(jobTZ); 152 jobTime.setTime(actionCreationTime); 153 return (dsTime.get(Calendar.ZONE_OFFSET) - jobTime.get(Calendar.ZONE_OFFSET)) / (1000 * 60); 154 } 155 156 public static int ph3_coord_tzOffset() { 157 return ph2_coord_tzOffset(); 158 } 159 160 /** 161 * Returns the a date string while given a base date in 'strBaseDate', 162 * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH). 163 * 164 * @param strBaseDate -- base date 165 * @param offset -- any number 166 * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH 167 * @return date string 168 * @throws Exception 169 */ 170 public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 171 Calendar baseCalDate = DateUtils.getCalendar(strBaseDate); 172 StringBuilder buffer = new StringBuilder(); 173 baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset); 174 buffer.append(DateUtils.formatDateUTC(baseCalDate)); 175 return buffer.toString(); 176 } 177 178 public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 179 return ph2_coord_dateOffset(strBaseDate, offset, unit); 180 } 181 182 /** 183 * Determine the date-time in UTC of n-th future available dataset instance 184 * from nominal Time but not beyond the instance specified as 'instance. 185 * <p/> 186 * It depends on: 187 * <p/> 188 * 1. Data set frequency 189 * <p/> 190 * 2. Data set Time unit (day, month, minute) 191 * <p/> 192 * 3. Data set Time zone/DST 193 * <p/> 194 * 4. End Day/Month flag 195 * <p/> 196 * 5. Data set initial instance 197 * <p/> 198 * 6. Action Creation Time 199 * <p/> 200 * 7. Existence of dataset's directory 201 * 202 * @param n :instance count 203 * <p/> 204 * domain: n >= 0, n is integer 205 * @param instance: How many future instance it should check? value should 206 * be >=0 207 * @return date-time in UTC of the n-th instance 208 * <p/> 209 * @throws Exception 210 */ 211 public static String ph3_coord_future(int n, int instance) throws Exception { 212 ParamChecker.checkGEZero(n, "future:n"); 213 ParamChecker.checkGTZero(instance, "future:instance"); 214 if (isSyncDataSet()) {// For Sync Dataset 215 return coord_future_sync(n, instance); 216 } 217 else { 218 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 219 } 220 } 221 222 private static String coord_future_sync(int n, int instance) throws Exception { 223 ELEvaluator eval = ELEvaluator.getCurrent(); 224 String retVal = ""; 225 int datasetFrequency = (int) getDSFrequency();// in minutes 226 TimeUnit dsTimeUnit = getDSTimeUnit(); 227 int[] instCount = new int[1]; 228 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 229 if (nominalInstanceCal != null) { 230 Calendar initInstance = getInitialInstanceCal(); 231 nominalInstanceCal = (Calendar) initInstance.clone(); 232 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 233 234 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 235 if (ds == null) { 236 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 237 } 238 String uriTemplate = ds.getUriTemplate(); 239 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 240 if (conf == null) { 241 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 242 } 243 int available = 0, checkedInstance = 0; 244 boolean resolved = false; 245 String user = ParamChecker 246 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 247 String doneFlag = ds.getDoneFlag(); 248 while (instance >= checkedInstance) { 249 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 250 String uriPath = uriEval.evaluate(uriTemplate, String.class); 251 String pathWithDoneFlag = uriPath; 252 if (doneFlag.length() > 0) { 253 pathWithDoneFlag += "/" + doneFlag; 254 } 255 if (isPathAvailable(pathWithDoneFlag, user, null, conf)) { 256 XLog.getLog(CoordELFunctions.class).debug("Found future(" + available + "): " + pathWithDoneFlag); 257 if (available == n) { 258 XLog.getLog(CoordELFunctions.class).debug("Found future File: " + pathWithDoneFlag); 259 resolved = true; 260 retVal = DateUtils.formatDateUTC(nominalInstanceCal); 261 eval.setVariable("resolved_path", uriPath); 262 break; 263 } 264 available++; 265 } 266 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), 267 // -datasetFrequency); 268 nominalInstanceCal = (Calendar) initInstance.clone(); 269 instCount[0]++; 270 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 271 checkedInstance++; 272 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 273 } 274 if (!resolved) { 275 // return unchanged future function with variable 'is_resolved' 276 // to 'false' 277 eval.setVariable("is_resolved", Boolean.FALSE); 278 retVal = "${coord:future(" + n + ", " + instance + ")}"; 279 } 280 else { 281 eval.setVariable("is_resolved", Boolean.TRUE); 282 } 283 } 284 else {// No feasible nominal time 285 eval.setVariable("is_resolved", Boolean.TRUE); 286 retVal = ""; 287 } 288 return retVal; 289 } 290 291 /** 292 * Return nominal time or Action Creation Time. 293 * <p/> 294 * 295 * @return coordinator action creation or materialization date time 296 * @throws Exception if unable to format the Date object to String 297 */ 298 public static String ph2_coord_nominalTime() throws Exception { 299 ELEvaluator eval = ELEvaluator.getCurrent(); 300 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 301 "Coordinator Action"); 302 return DateUtils.formatDateUTC(action.getNominalTime()); 303 } 304 305 public static String ph3_coord_nominalTime() throws Exception { 306 return ph2_coord_nominalTime(); 307 } 308 309 /** 310 * Convert from standard date-time formatting to a desired format. 311 * <p/> 312 * @param dateTimeStr - A timestamp in standard (ISO8601) format. 313 * @param format - A string representing the desired format. 314 * @return coordinator action creation or materialization date time 315 * @throws Exception if unable to format the Date object to String 316 */ 317 public static String ph2_coord_formatTime(String dateTimeStr, String format) 318 throws Exception { 319 Date dateTime = DateUtils.parseDateUTC(dateTimeStr); 320 return DateUtils.formatDateCustom(dateTime, format); 321 } 322 323 public static String ph3_coord_formatTime(String dateTimeStr, String format) 324 throws Exception { 325 return ph2_coord_formatTime(dateTimeStr, format); 326 } 327 328 /** 329 * Return Action Id. <p/> 330 * 331 * @return coordinator action Id 332 */ 333 public static String ph2_coord_actionId() throws Exception { 334 ELEvaluator eval = ELEvaluator.getCurrent(); 335 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 336 "Coordinator Action"); 337 return action.getActionId(); 338 } 339 340 public static String ph3_coord_actionId() throws Exception { 341 return ph2_coord_actionId(); 342 } 343 344 /** 345 * Return Job Name. <p/> 346 * 347 * @return coordinator name 348 */ 349 public static String ph2_coord_name() throws Exception { 350 ELEvaluator eval = ELEvaluator.getCurrent(); 351 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 352 "Coordinator Action"); 353 return action.getName(); 354 } 355 356 public static String ph3_coord_name() throws Exception { 357 return ph2_coord_name(); 358 } 359 360 /** 361 * Return Action Start time. <p/> 362 * 363 * @return coordinator action start time 364 * @throws Exception if unable to format the Date object to String 365 */ 366 public static String ph2_coord_actualTime() throws Exception { 367 ELEvaluator eval = ELEvaluator.getCurrent(); 368 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 369 if (coordAction == null) { 370 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 371 } 372 return DateUtils.formatDateUTC(coordAction.getActualTime()); 373 } 374 375 public static String ph3_coord_actualTime() throws Exception { 376 return ph2_coord_actualTime(); 377 } 378 379 /** 380 * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level 381 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of 382 * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 383 * unresolved, this function will echo back the original function <p/> otherwise it sends the uris. 384 * 385 * @param dataInName : Datain name 386 * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest) 387 * , echo back <p/> the function without resolving the function. 388 */ 389 public static String ph3_coord_dataIn(String dataInName) { 390 String uris = ""; 391 ELEvaluator eval = ELEvaluator.getCurrent(); 392 uris = (String) eval.getVariable(".datain." + dataInName); 393 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 394 if (unresolved != null && unresolved.booleanValue() == true) { 395 return "${coord:dataIn('" + dataInName + "')}"; 396 } 397 return uris; 398 } 399 400 /** 401 * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level 402 * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris. 403 * 404 * @param dataOutName : Dataout name 405 * @return the list of URI's separated by INSTANCE_SEPARATOR 406 */ 407 public static String ph3_coord_dataOut(String dataOutName) { 408 String uris = ""; 409 ELEvaluator eval = ELEvaluator.getCurrent(); 410 uris = (String) eval.getVariable(".dataout." + dataOutName); 411 return uris; 412 } 413 414 /** 415 * Determine the date-time in UTC of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency <p/> 2. 416 * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data 417 * set initial instance <p/> 6. Action Creation Time 418 * 419 * @param n instance count domain: n is integer 420 * @return date-time in UTC of the n-th instance returns 'null' means n-th instance is earlier than Initial-Instance 421 * of DS 422 * @throws Exception 423 */ 424 public static String ph2_coord_current(int n) throws Exception { 425 if (isSyncDataSet()) { // For Sync Dataset 426 return coord_current_sync(n); 427 } 428 else { 429 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 430 } 431 } 432 433 /** 434 * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency 435 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. 436 * Data set initial instance <p/> 6. Action Creation Time 437 * 438 * @param n instance count <p/> domain: n is integer 439 * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS 440 * @throws Exception 441 */ 442 public static int ph2_coord_hoursInDay(int n) throws Exception { 443 int datasetFrequency = (int) getDSFrequency(); 444 // /Calendar nominalInstanceCal = 445 // getCurrentInstance(getActionCreationtime()); 446 Calendar nominalInstanceCal = getEffectiveNominalTime(); 447 if (nominalInstanceCal == null) { 448 return -1; 449 } 450 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 451 /* 452 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 453 * { return -1; } 454 */ 455 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 456 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 457 return DateUtils.hoursInDay(nominalInstanceCal); 458 } 459 460 public static int ph3_coord_hoursInDay(int n) throws Exception { 461 return ph2_coord_hoursInDay(n); 462 } 463 464 /** 465 * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency . 466 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. 467 * Data set initial instance <p/> 6. Action Creation Time 468 * 469 * @param n instance count. domain: n is integer 470 * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS 471 * @throws Exception 472 */ 473 public static int ph2_coord_daysInMonth(int n) throws Exception { 474 int datasetFrequency = (int) getDSFrequency();// in minutes 475 // Calendar nominalInstanceCal = 476 // getCurrentInstance(getActionCreationtime()); 477 Calendar nominalInstanceCal = getEffectiveNominalTime(); 478 if (nominalInstanceCal == null) { 479 return -1; 480 } 481 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 482 /* 483 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 484 * { return -1; } 485 */ 486 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 487 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 488 return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH); 489 } 490 491 public static int ph3_coord_daysInMonth(int n) throws Exception { 492 return ph2_coord_daysInMonth(n); 493 } 494 495 /** 496 * Determine the date-time in UTC of n-th latest available dataset instance. <p/> It depends on: <p/> 1. Data set 497 * frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month 498 * flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of dataset's directory 499 * 500 * @param n :instance count <p/> domain: n > 0, n is integer 501 * @return date-time in UTC of the n-th instance <p/> returns 'null' means n-th instance is earlier than 502 * Initial-Instance of DS 503 * @throws Exception 504 */ 505 public static String ph3_coord_latest(int n) throws Exception { 506 if (n > 0) { 507 throw new IllegalArgumentException("paramter should be <= 0 but it is " + n); 508 } 509 if (isSyncDataSet()) {// For Sync Dataset 510 return coord_latest_sync(n); 511 } 512 else { 513 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 514 } 515 } 516 517 /** 518 * Configure an evaluator with data set and application specific information. <p/> Helper method of associating 519 * dataset and application object 520 * 521 * @param evaluator : to set variables 522 * @param ds : Data Set object 523 * @param coordAction : Application instance 524 */ 525 public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) { 526 evaluator.setVariable(COORD_ACTION, coordAction); 527 evaluator.setVariable(DATASET, ds); 528 } 529 530 /** 531 * Helper method to wrap around with "${..}". <p/> 532 * 533 * 534 * @param eval :EL evaluator 535 * @param expr : expression to evaluate 536 * @return Resolved expression or echo back the same expression 537 * @throws Exception 538 */ 539 public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception { 540 try { 541 eval.setVariable(".wrap", null); 542 String result = eval.evaluate(expr, String.class); 543 if (eval.getVariable(".wrap") != null) { 544 return "${" + result + "}"; 545 } 546 else { 547 return result; 548 } 549 } 550 catch (Exception e) { 551 throw new Exception("Unable to evaluate :" + expr + ":\n", e); 552 } 553 } 554 555 // Set of echo functions 556 557 public static String ph1_coord_current_echo(String n) { 558 return echoUnResolved("current", n); 559 } 560 561 public static String ph2_coord_current_echo(String n) { 562 return echoUnResolved("current", n); 563 } 564 565 public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) { 566 return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit); 567 } 568 569 public static String ph1_coord_formatTime_echo(String dateTime, String format) { 570 // Quote the dateTime value since it would contain a ':'. 571 return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format); 572 } 573 574 public static String ph1_coord_latest_echo(String n) { 575 return echoUnResolved("latest", n); 576 } 577 578 public static String ph2_coord_latest_echo(String n) { 579 return ph1_coord_latest_echo(n); 580 } 581 582 public static String ph1_coord_future_echo(String n, String instance) { 583 return echoUnResolved("future", n + ", " + instance + ""); 584 } 585 586 public static String ph2_coord_future_echo(String n, String instance) { 587 return ph1_coord_future_echo(n, instance); 588 } 589 590 public static String ph1_coord_dataIn_echo(String n) { 591 ELEvaluator eval = ELEvaluator.getCurrent(); 592 String val = (String) eval.getVariable("oozie.dataname." + n); 593 if (val == null || val.equals("data-in") == false) { 594 XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid"); 595 throw new RuntimeException("data_in_name " + n + " is not valid"); 596 } 597 return echoUnResolved("dataIn", "'" + n + "'"); 598 } 599 600 public static String ph1_coord_dataOut_echo(String n) { 601 ELEvaluator eval = ELEvaluator.getCurrent(); 602 String val = (String) eval.getVariable("oozie.dataname." + n); 603 if (val == null || val.equals("data-out") == false) { 604 XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid"); 605 throw new RuntimeException("data_out_name " + n + " is not valid"); 606 } 607 return echoUnResolved("dataOut", "'" + n + "'"); 608 } 609 610 public static String ph1_coord_nominalTime_echo() { 611 return echoUnResolved("nominalTime", ""); 612 } 613 614 public static String ph1_coord_nominalTime_echo_wrap() { 615 // return "${coord:nominalTime()}"; // no resolution 616 return echoUnResolved("nominalTime", ""); 617 } 618 619 public static String ph1_coord_nominalTime_echo_fixed() { 620 return "2009-03-06T010:00"; // Dummy resolution 621 } 622 623 public static String ph1_coord_actualTime_echo_wrap() { 624 // return "${coord:actualTime()}"; // no resolution 625 return echoUnResolved("actualTime", ""); 626 } 627 628 public static String ph1_coord_actionId_echo() { 629 return echoUnResolved("actionId", ""); 630 } 631 632 public static String ph1_coord_name_echo() { 633 return echoUnResolved("name", ""); 634 } 635 636 // The following echo functions are not used in any phases yet 637 // They are here for future purpose. 638 public static String coord_minutes_echo(String n) { 639 return echoUnResolved("minutes", n); 640 } 641 642 public static String coord_hours_echo(String n) { 643 return echoUnResolved("hours", n); 644 } 645 646 public static String coord_days_echo(String n) { 647 return echoUnResolved("days", n); 648 } 649 650 public static String coord_endOfDay_echo(String n) { 651 return echoUnResolved("endOfDay", n); 652 } 653 654 public static String coord_months_echo(String n) { 655 return echoUnResolved("months", n); 656 } 657 658 public static String coord_endOfMonth_echo(String n) { 659 return echoUnResolved("endOfMonth", n); 660 } 661 662 public static String coord_actualTime_echo() { 663 return echoUnResolved("actualTime", ""); 664 } 665 666 // This echo function will always return "24" for validation only. 667 // This evaluation ****should not**** replace the original XML 668 // Create a temporary string and validate the function 669 // This is **required** for evaluating an expression like 670 // coord:HoursInDay(0) + 3 671 // actual evaluation will happen in phase 2 or phase 3. 672 public static String ph1_coord_hoursInDay_echo(String n) { 673 return "24"; 674 // return echoUnResolved("hoursInDay", n); 675 } 676 677 // This echo function will always return "30" for validation only. 678 // This evaluation ****should not**** replace the original XML 679 // Create a temporary string and validate the function 680 // This is **required** for evaluating an expression like 681 // coord:daysInMonth(0) + 3 682 // actual evaluation will happen in phase 2 or phase 3. 683 public static String ph1_coord_daysInMonth_echo(String n) { 684 // return echoUnResolved("daysInMonth", n); 685 return "30"; 686 } 687 688 // This echo function will always return "3" for validation only. 689 // This evaluation ****should not**** replace the original XML 690 // Create a temporary string and validate the function 691 // This is **required** for evaluating an expression like coord:tzOffset + 2 692 // actual evaluation will happen in phase 2 or phase 3. 693 public static String ph1_coord_tzOffset_echo() { 694 // return echoUnResolved("tzOffset", ""); 695 return "3"; 696 } 697 698 // Local methods 699 /** 700 * @param n 701 * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the 702 * Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset. 703 * @throws Exception 704 */ 705 private static String coord_current_sync(int n) throws Exception { 706 int datasetFrequency = getDSFrequency();// in minutes 707 TimeUnit dsTimeUnit = getDSTimeUnit(); 708 int[] instCount = new int[1];// used as pass by ref 709 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 710 if (nominalInstanceCal == null) { 711 XLog.getLog(CoordELFunctions.class) 712 .warn("If the initial instance of the dataset is later than the nominal time, an empty string is returned. This means that no data is available at the current-instance specified by the user and the user could try modifying his initial-instance to an earlier time."); 713 return ""; 714 } 715 nominalInstanceCal = getInitialInstanceCal(); 716 int absInstanceCount = instCount[0] + n; 717 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency * absInstanceCount); 718 719 if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) { 720 XLog.getLog(CoordELFunctions.class) 721 .warn("If the initial instance of the dataset is later than the current-instance specified, such as coord:current({0}) in this case, an empty string is returned. This means that no data is available at the current-instance specified by the user and the user could try modifying his initial-instance to an earlier time.", n); 722 return ""; 723 } 724 String str = DateUtils.formatDateUTC(nominalInstanceCal); 725 return str; 726 } 727 728 /** 729 * @param offset 730 * @return n-th available latest instance Date-Time for SYNC data-set 731 * @throws Exception 732 */ 733 private static String coord_latest_sync(int offset) throws Exception { 734 if (offset > 0) { 735 throw new RuntimeException("For latest there is no meaning " + "of positive instance. n should be <=0" 736 + offset); 737 } 738 ELEvaluator eval = ELEvaluator.getCurrent(); 739 String retVal = ""; 740 int datasetFrequency = (int) getDSFrequency();// in minutes 741 TimeUnit dsTimeUnit = getDSTimeUnit(); 742 int[] instCount = new int[1]; 743 Calendar nominalInstanceCal = getCurrentInstance(getActualTime(), instCount); 744 if (nominalInstanceCal != null) { 745 Calendar initInstance = getInitialInstanceCal(); 746 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 747 if (ds == null) { 748 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 749 } 750 String uriTemplate = ds.getUriTemplate(); 751 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 752 if (conf == null) { 753 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 754 } 755 int available = 0; 756 boolean resolved = false; 757 String user = ParamChecker 758 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 759 String doneFlag = ds.getDoneFlag(); 760 while (nominalInstanceCal.compareTo(initInstance) >= 0) { 761 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 762 String uriPath = uriEval.evaluate(uriTemplate, String.class); 763 String pathWithDoneFlag = uriPath; 764 if (doneFlag.length() > 0) { 765 pathWithDoneFlag += "/" + doneFlag; 766 } 767 if (isPathAvailable(pathWithDoneFlag, user, null, conf)) { 768 XLog.getLog(CoordELFunctions.class).debug("Found latest(" + available + "): " + pathWithDoneFlag); 769 if (available == offset) { 770 XLog.getLog(CoordELFunctions.class).debug("Found Latest File: " + pathWithDoneFlag); 771 resolved = true; 772 retVal = DateUtils.formatDateUTC(nominalInstanceCal); 773 eval.setVariable("resolved_path", uriPath); 774 break; 775 } 776 777 available--; 778 } 779 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), 780 // -datasetFrequency); 781 nominalInstanceCal = (Calendar) initInstance.clone(); 782 instCount[0]--; 783 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 784 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 785 } 786 if (!resolved) { 787 // return unchanged latest function with variable 'is_resolved' 788 // to 'false' 789 eval.setVariable("is_resolved", Boolean.FALSE); 790 retVal = "${coord:latest(" + offset + ")}"; 791 } 792 else { 793 eval.setVariable("is_resolved", Boolean.TRUE); 794 } 795 } 796 else {// No feasible nominal time 797 eval.setVariable("is_resolved", Boolean.FALSE); 798 } 799 return retVal; 800 } 801 802 // TODO : Not an efficient way. In a loop environment, we could do something 803 // outside the loop 804 /** 805 * Check whether a URI path exists 806 * 807 * @param sPath 808 * @param conf 809 * @return 810 * @throws IOException 811 */ 812 813 private static boolean isPathAvailable(String sPath, String user, String group, Configuration conf) 814 throws IOException, HadoopAccessorException { 815 // sPath += "/" + END_OF_OPERATION_INDICATOR_FILE; 816 Path path = new Path(sPath); 817 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 818 Configuration fsConf = has.createJobConf(path.toUri().getAuthority()); 819 return has.createFileSystem(user, path.toUri(), fsConf).exists(path); 820 } 821 822 /** 823 * @param tm 824 * @return a new Evaluator to be used for URI-template evaluation 825 */ 826 private static ELEvaluator getUriEvaluator(Calendar tm) { 827 ELEvaluator retEval = new ELEvaluator(); 828 retEval.setVariable("YEAR", tm.get(Calendar.YEAR)); 829 retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm 830 .get(Calendar.MONTH) + 1)); 831 retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm 832 .get(Calendar.DAY_OF_MONTH)); 833 retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm 834 .get(Calendar.HOUR_OF_DAY)); 835 retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm 836 .get(Calendar.MINUTE)); 837 return retEval; 838 } 839 840 /** 841 * @return whether a data set is SYNCH or ASYNC 842 */ 843 private static boolean isSyncDataSet() { 844 ELEvaluator eval = ELEvaluator.getCurrent(); 845 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 846 if (ds == null) { 847 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 848 } 849 return ds.getType().equalsIgnoreCase("SYNC"); 850 } 851 852 /** 853 * Check whether a function should be resolved. 854 * 855 * @param functionName 856 * @param n 857 * @return null if the functionName needs to be resolved otherwise return the calling function unresolved. 858 */ 859 private static String checkIfResolved(String functionName, String n) { 860 ELEvaluator eval = ELEvaluator.getCurrent(); 861 String replace = (String) eval.getVariable("resolve_" + functionName); 862 if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't 863 // resolve 864 // return "${coord:" + functionName + "(" + n +")}"; //Unresolved 865 eval.setVariable(".wrap", "true"); 866 return "coord:" + functionName + "(" + n + ")"; // Unresolved 867 } 868 return null; // Resolved it 869 } 870 871 private static String echoUnResolved(String functionName, String n) { 872 return echoUnResolvedPre(functionName, n, "coord:"); 873 } 874 875 private static String echoUnResolvedPre(String functionName, String n, String prefix) { 876 ELEvaluator eval = ELEvaluator.getCurrent(); 877 eval.setVariable(".wrap", "true"); 878 return prefix + functionName + "(" + n + ")"; // Unresolved 879 } 880 881 /** 882 * @return the initial instance of a DataSet in DATE 883 */ 884 private static Date getInitialInstance() { 885 return getInitialInstanceCal().getTime(); 886 // return ds.getInitInstance(); 887 } 888 889 /** 890 * @return the initial instance of a DataSet in Calendar 891 */ 892 private static Calendar getInitialInstanceCal() { 893 ELEvaluator eval = ELEvaluator.getCurrent(); 894 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 895 if (ds == null) { 896 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 897 } 898 Calendar effInitTS = Calendar.getInstance(); 899 effInitTS.setTime(ds.getInitInstance()); 900 effInitTS.setTimeZone(ds.getTimeZone()); 901 // To adjust EOD/EOM 902 DateUtils.moveToEnd(effInitTS, getDSEndOfFlag()); 903 return effInitTS; 904 // return ds.getInitInstance(); 905 } 906 907 /** 908 * @return Nominal or action creation Time when all the dependencies of an application instance are met. 909 */ 910 private static Date getActionCreationtime() { 911 ELEvaluator eval = ELEvaluator.getCurrent(); 912 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 913 if (coordAction == null) { 914 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 915 } 916 return coordAction.getNominalTime(); 917 } 918 919 /** 920 * @return Actual Time when all the dependencies of an application instance are met. 921 */ 922 private static Date getActualTime() { 923 ELEvaluator eval = ELEvaluator.getCurrent(); 924 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 925 if (coordAction == null) { 926 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 927 } 928 return coordAction.getActualTime(); 929 } 930 931 /** 932 * @return TimeZone for the application or job. 933 */ 934 private static TimeZone getJobTZ() { 935 ELEvaluator eval = ELEvaluator.getCurrent(); 936 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 937 if (coordAction == null) { 938 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 939 } 940 return coordAction.getTimeZone(); 941 } 942 943 /** 944 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 945 * 946 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 947 * the dataset. 948 */ 949 private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) { 950 Date datasetInitialInstance = getInitialInstance(); 951 TimeUnit dsTimeUnit = getDSTimeUnit(); 952 TimeZone dsTZ = getDatasetTZ(); 953 // Convert Date to Calendar for corresponding TZ 954 Calendar current = Calendar.getInstance(); 955 current.setTime(datasetInitialInstance); 956 current.setTimeZone(dsTZ); 957 958 Calendar calEffectiveTime = Calendar.getInstance(); 959 calEffectiveTime.setTime(effectiveTime); 960 calEffectiveTime.setTimeZone(dsTZ); 961 instanceCount[0] = 0; 962 if (current.compareTo(calEffectiveTime) > 0) { 963 // Nominal Time < initial Instance 964 // TODO: getClass() call doesn't work from static method. 965 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+ 966 // current.getTime()); 967 return null; 968 } 969 Calendar origCurrent = (Calendar) current.clone(); 970 while (current.compareTo(calEffectiveTime) <= 0) { 971 current = (Calendar) origCurrent.clone(); 972 instanceCount[0]++; 973 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * getDSFrequency()); 974 } 975 instanceCount[0]--; 976 977 current = (Calendar) origCurrent.clone(); 978 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * getDSFrequency()); 979 return current; 980 } 981 982 private static Calendar getEffectiveNominalTime() { 983 Date datasetInitialInstance = getInitialInstance(); 984 TimeZone dsTZ = getDatasetTZ(); 985 // Convert Date to Calendar for corresponding TZ 986 Calendar current = Calendar.getInstance(); 987 current.setTime(datasetInitialInstance); 988 current.setTimeZone(dsTZ); 989 990 Calendar calEffectiveTime = Calendar.getInstance(); 991 calEffectiveTime.setTime(getActionCreationtime()); 992 calEffectiveTime.setTimeZone(dsTZ); 993 if (current.compareTo(calEffectiveTime) > 0) { 994 // Nominal Time < initial Instance 995 // TODO: getClass() call doesn't work from static method. 996 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+ 997 // current.getTime()); 998 return null; 999 } 1000 return calEffectiveTime; 1001 } 1002 1003 /** 1004 * @return dataset frequency in minutes 1005 */ 1006 private static int getDSFrequency() { 1007 ELEvaluator eval = ELEvaluator.getCurrent(); 1008 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1009 if (ds == null) { 1010 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1011 } 1012 return ds.getFrequency(); 1013 } 1014 1015 /** 1016 * @return dataset TimeUnit 1017 */ 1018 private static TimeUnit getDSTimeUnit() { 1019 ELEvaluator eval = ELEvaluator.getCurrent(); 1020 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1021 if (ds == null) { 1022 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1023 } 1024 return ds.getTimeUnit(); 1025 } 1026 1027 /** 1028 * @return dataset TimeZone 1029 */ 1030 private static TimeZone getDatasetTZ() { 1031 ELEvaluator eval = ELEvaluator.getCurrent(); 1032 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1033 if (ds == null) { 1034 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1035 } 1036 return ds.getTimeZone(); 1037 } 1038 1039 /** 1040 * @return dataset TimeUnit 1041 */ 1042 private static TimeUnit getDSEndOfFlag() { 1043 ELEvaluator eval = ELEvaluator.getCurrent(); 1044 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1045 if (ds == null) { 1046 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1047 } 1048 return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration(); 1049 } 1050 1051 /** 1052 * Return a job configuration property for the coordinator. 1053 * 1054 * @param property property name. 1055 * @return the value of the property, <code>null</code> if the property is undefined. 1056 */ 1057 public static String coord_conf(String property) { 1058 ELEvaluator eval = ELEvaluator.getCurrent(); 1059 return (String) eval.getVariable(property); 1060 } 1061 1062 /** 1063 * Return the user that submitted the coordinator job. 1064 * 1065 * @return the user that submitted the coordinator job. 1066 */ 1067 public static String coord_user() { 1068 ELEvaluator eval = ELEvaluator.getCurrent(); 1069 return (String) eval.getVariable(OozieClient.USER_NAME); 1070 } 1071 }