001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.coord; 019 020 import java.io.IOException; 021 import java.util.Calendar; 022 import java.util.Date; 023 import java.util.TimeZone; 024 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.fs.Path; 027 028 import org.apache.oozie.client.OozieClient; 029 import org.apache.oozie.util.DateUtils; 030 import org.apache.oozie.util.ELEvaluator; 031 import org.apache.oozie.util.ParamChecker; 032 import org.apache.oozie.util.XLog; 033 import org.apache.oozie.service.HadoopAccessorException; 034 import org.apache.oozie.service.Services; 035 import org.apache.oozie.service.HadoopAccessorService; 036 037 /** 038 * This class implements the EL function related to coordinator 039 */ 040 041 public class CoordELFunctions { 042 final private static String DATASET = "oozie.coord.el.dataset.bean"; 043 final private static String COORD_ACTION = "oozie.coord.el.app.bean"; 044 final public static String CONFIGURATION = "oozie.coord.el.conf"; 045 // INSTANCE_SEPARATOR is used to separate multiple directories into one tag. 046 final public static String INSTANCE_SEPARATOR = "#"; 047 final public static String DIR_SEPARATOR = ","; 048 // TODO: in next release, support flexibility 049 private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS"; 050 051 /** 052 * Used in defining the frequency in 'day' unit. <p/> domain: <code> val > 0</code> and should be integer. 053 * 054 * @param val frequency in number of days. 055 * @return number of days and also set the frequency timeunit to "day" 056 */ 057 public static int ph1_coord_days(int val) { 058 val = ParamChecker.checkGTZero(val, "n"); 059 ELEvaluator eval = ELEvaluator.getCurrent(); 060 eval.setVariable("timeunit", TimeUnit.DAY); 061 eval.setVariable("endOfDuration", TimeUnit.NONE); 062 return val; 063 } 064 065 /** 066 * Used in defining the frequency in 'month' unit. <p/> domain: <code> val > 0</code> and should be integer. 067 * 068 * @param val frequency in number of months. 069 * @return number of months and also set the frequency timeunit to "month" 070 */ 071 public static int ph1_coord_months(int val) { 072 val = ParamChecker.checkGTZero(val, "n"); 073 ELEvaluator eval = ELEvaluator.getCurrent(); 074 eval.setVariable("timeunit", TimeUnit.MONTH); 075 eval.setVariable("endOfDuration", TimeUnit.NONE); 076 return val; 077 } 078 079 /** 080 * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val > 0</code> and should 081 * be integer. 082 * 083 * @param val frequency in number of hours. 084 * @return number of minutes and also set the frequency timeunit to "minute" 085 */ 086 public static int ph1_coord_hours(int val) { 087 val = ParamChecker.checkGTZero(val, "n"); 088 ELEvaluator eval = ELEvaluator.getCurrent(); 089 eval.setVariable("timeunit", TimeUnit.MINUTE); 090 eval.setVariable("endOfDuration", TimeUnit.NONE); 091 return val * 60; 092 } 093 094 /** 095 * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val > 0</code> and should be integer. 096 * 097 * @param val frequency in number of minutes. 098 * @return number of minutes and also set the frequency timeunit to "minute" 099 */ 100 public static int ph1_coord_minutes(int val) { 101 val = ParamChecker.checkGTZero(val, "n"); 102 ELEvaluator eval = ELEvaluator.getCurrent(); 103 eval.setVariable("timeunit", TimeUnit.MINUTE); 104 eval.setVariable("endOfDuration", TimeUnit.NONE); 105 return val; 106 } 107 108 /** 109 * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will 110 * start at 00:00 hour of each day. <p/> domain: <code> val > 0</code> and should be integer. 111 * 112 * @param val frequency in number of days. 113 * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day" 114 */ 115 public static int ph1_coord_endOfDays(int val) { 116 val = ParamChecker.checkGTZero(val, "n"); 117 ELEvaluator eval = ELEvaluator.getCurrent(); 118 eval.setVariable("timeunit", TimeUnit.DAY); 119 eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY); 120 return val; 121 } 122 123 /** 124 * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will 125 * start at first day of each month at 00:00 hour. <p/> domain: <code> val > 0</code> and should be integer. 126 * 127 * @param val: frequency in number of months. 128 * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month" 129 */ 130 public static int ph1_coord_endOfMonths(int val) { 131 val = ParamChecker.checkGTZero(val, "n"); 132 ELEvaluator eval = ELEvaluator.getCurrent(); 133 eval.setVariable("timeunit", TimeUnit.MONTH); 134 eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH); 135 return val; 136 } 137 138 /** 139 * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/> 140 * 1. Timezone of both dataset and job <p/> 2. Action creation Time 141 * 142 * @return difference in minutes (DataSet TZ Offset - Application TZ offset) 143 */ 144 public static int ph2_coord_tzOffset() { 145 Date actionCreationTime = getActionCreationtime(); 146 TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ"); 147 TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ"); 148 // Apply the TZ into Calendar object 149 Calendar dsTime = Calendar.getInstance(dsTZ); 150 dsTime.setTime(actionCreationTime); 151 Calendar jobTime = Calendar.getInstance(jobTZ); 152 jobTime.setTime(actionCreationTime); 153 return (dsTime.get(Calendar.ZONE_OFFSET) - jobTime.get(Calendar.ZONE_OFFSET)) / (1000 * 60); 154 } 155 156 public static int ph3_coord_tzOffset() { 157 return ph2_coord_tzOffset(); 158 } 159 160 /** 161 * Returns the a date string while given a base date in 'strBaseDate', 162 * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH). 163 * 164 * @param strBaseDate -- base date 165 * @param offset -- any number 166 * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH 167 * @return date string 168 * @throws Exception 169 */ 170 public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 171 Calendar baseCalDate = DateUtils.getCalendar(strBaseDate); 172 StringBuilder buffer = new StringBuilder(); 173 baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset); 174 buffer.append(DateUtils.formatDateOozieTZ(baseCalDate)); 175 return buffer.toString(); 176 } 177 178 public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 179 return ph2_coord_dateOffset(strBaseDate, offset, unit); 180 } 181 182 /** 183 * Determine the date-time in Oozie processing timezone of n-th future available dataset instance 184 * from nominal Time but not beyond the instance specified as 'instance. 185 * <p/> 186 * It depends on: 187 * <p/> 188 * 1. Data set frequency 189 * <p/> 190 * 2. Data set Time unit (day, month, minute) 191 * <p/> 192 * 3. Data set Time zone/DST 193 * <p/> 194 * 4. End Day/Month flag 195 * <p/> 196 * 5. Data set initial instance 197 * <p/> 198 * 6. Action Creation Time 199 * <p/> 200 * 7. Existence of dataset's directory 201 * 202 * @param n :instance count 203 * <p/> 204 * domain: n >= 0, n is integer 205 * @param instance: How many future instance it should check? value should 206 * be >=0 207 * @return date-time in Oozie processing timezone of the n-th instance 208 * <p/> 209 * @throws Exception 210 */ 211 public static String ph3_coord_future(int n, int instance) throws Exception { 212 ParamChecker.checkGEZero(n, "future:n"); 213 ParamChecker.checkGTZero(instance, "future:instance"); 214 if (isSyncDataSet()) {// For Sync Dataset 215 return coord_future_sync(n, instance); 216 } 217 else { 218 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 219 } 220 } 221 222 private static String coord_future_sync(int n, int instance) throws Exception { 223 ELEvaluator eval = ELEvaluator.getCurrent(); 224 String retVal = ""; 225 int datasetFrequency = (int) getDSFrequency();// in minutes 226 TimeUnit dsTimeUnit = getDSTimeUnit(); 227 int[] instCount = new int[1]; 228 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 229 if (nominalInstanceCal != null) { 230 Calendar initInstance = getInitialInstanceCal(); 231 nominalInstanceCal = (Calendar) initInstance.clone(); 232 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 233 234 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 235 if (ds == null) { 236 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 237 } 238 String uriTemplate = ds.getUriTemplate(); 239 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 240 if (conf == null) { 241 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 242 } 243 int available = 0, checkedInstance = 0; 244 boolean resolved = false; 245 String user = ParamChecker 246 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 247 String doneFlag = ds.getDoneFlag(); 248 while (instance >= checkedInstance) { 249 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 250 String uriPath = uriEval.evaluate(uriTemplate, String.class); 251 String pathWithDoneFlag = uriPath; 252 if (doneFlag.length() > 0) { 253 pathWithDoneFlag += "/" + doneFlag; 254 } 255 if (isPathAvailable(pathWithDoneFlag, user, null, conf)) { 256 XLog.getLog(CoordELFunctions.class).debug("Found future(" + available + "): " + pathWithDoneFlag); 257 if (available == n) { 258 XLog.getLog(CoordELFunctions.class).debug("Found future File: " + pathWithDoneFlag); 259 resolved = true; 260 retVal = DateUtils.formatDateOozieTZ(nominalInstanceCal); 261 eval.setVariable("resolved_path", uriPath); 262 break; 263 } 264 available++; 265 } 266 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), 267 // -datasetFrequency); 268 nominalInstanceCal = (Calendar) initInstance.clone(); 269 instCount[0]++; 270 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 271 checkedInstance++; 272 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 273 } 274 if (!resolved) { 275 // return unchanged future function with variable 'is_resolved' 276 // to 'false' 277 eval.setVariable("is_resolved", Boolean.FALSE); 278 retVal = "${coord:future(" + n + ", " + instance + ")}"; 279 } 280 else { 281 eval.setVariable("is_resolved", Boolean.TRUE); 282 } 283 } 284 else {// No feasible nominal time 285 eval.setVariable("is_resolved", Boolean.TRUE); 286 retVal = ""; 287 } 288 return retVal; 289 } 290 291 /** 292 * Return nominal time or Action Creation Time. 293 * <p/> 294 * 295 * @return coordinator action creation or materialization date time 296 * @throws Exception if unable to format the Date object to String 297 */ 298 public static String ph2_coord_nominalTime() throws Exception { 299 ELEvaluator eval = ELEvaluator.getCurrent(); 300 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 301 "Coordinator Action"); 302 return DateUtils.formatDateOozieTZ(action.getNominalTime()); 303 } 304 305 public static String ph3_coord_nominalTime() throws Exception { 306 return ph2_coord_nominalTime(); 307 } 308 309 /** 310 * Convert from standard date-time formatting to a desired format. 311 * <p/> 312 * @param dateTimeStr - A timestamp in standard (ISO8601) format. 313 * @param format - A string representing the desired format. 314 * @return coordinator action creation or materialization date time 315 * @throws Exception if unable to format the Date object to String 316 */ 317 public static String ph2_coord_formatTime(String dateTimeStr, String format) 318 throws Exception { 319 Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr); 320 return DateUtils.formatDateCustom(dateTime, format); 321 } 322 323 public static String ph3_coord_formatTime(String dateTimeStr, String format) 324 throws Exception { 325 return ph2_coord_formatTime(dateTimeStr, format); 326 } 327 328 /** 329 * Return Action Id. <p/> 330 * 331 * @return coordinator action Id 332 */ 333 public static String ph2_coord_actionId() throws Exception { 334 ELEvaluator eval = ELEvaluator.getCurrent(); 335 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 336 "Coordinator Action"); 337 return action.getActionId(); 338 } 339 340 public static String ph3_coord_actionId() throws Exception { 341 return ph2_coord_actionId(); 342 } 343 344 /** 345 * Return Job Name. <p/> 346 * 347 * @return coordinator name 348 */ 349 public static String ph2_coord_name() throws Exception { 350 ELEvaluator eval = ELEvaluator.getCurrent(); 351 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 352 "Coordinator Action"); 353 return action.getName(); 354 } 355 356 public static String ph3_coord_name() throws Exception { 357 return ph2_coord_name(); 358 } 359 360 /** 361 * Return Action Start time. <p/> 362 * 363 * @return coordinator action start time 364 * @throws Exception if unable to format the Date object to String 365 */ 366 public static String ph2_coord_actualTime() throws Exception { 367 ELEvaluator eval = ELEvaluator.getCurrent(); 368 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 369 if (coordAction == null) { 370 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 371 } 372 return DateUtils.formatDateOozieTZ(coordAction.getActualTime()); 373 } 374 375 public static String ph3_coord_actualTime() throws Exception { 376 return ph2_coord_actualTime(); 377 } 378 379 /** 380 * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level 381 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of 382 * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 383 * unresolved, this function will echo back the original function <p/> otherwise it sends the uris. 384 * 385 * @param dataInName : Datain name 386 * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest) 387 * , echo back <p/> the function without resolving the function. 388 */ 389 public static String ph3_coord_dataIn(String dataInName) { 390 String uris = ""; 391 ELEvaluator eval = ELEvaluator.getCurrent(); 392 uris = (String) eval.getVariable(".datain." + dataInName); 393 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 394 if (unresolved != null && unresolved.booleanValue() == true) { 395 return "${coord:dataIn('" + dataInName + "')}"; 396 } 397 return uris; 398 } 399 400 /** 401 * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level 402 * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris. 403 * 404 * @param dataOutName : Dataout name 405 * @return the list of URI's separated by INSTANCE_SEPARATOR 406 */ 407 public static String ph3_coord_dataOut(String dataOutName) { 408 String uris = ""; 409 ELEvaluator eval = ELEvaluator.getCurrent(); 410 uris = (String) eval.getVariable(".dataout." + dataOutName); 411 return uris; 412 } 413 414 /** 415 * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p/> It depends on: <p/> 1. 416 * Data set frequency <p/> 2. 417 * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data 418 * set initial instance <p/> 6. Action Creation Time 419 * 420 * @param n instance count domain: n is integer 421 * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is 422 * earlier than Initial-Instance of DS 423 * @throws Exception 424 */ 425 public static String ph2_coord_current(int n) throws Exception { 426 if (isSyncDataSet()) { // For Sync Dataset 427 return coord_current_sync(n); 428 } 429 else { 430 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 431 } 432 } 433 434 /** 435 * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency 436 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. 437 * Data set initial instance <p/> 6. Action Creation Time 438 * 439 * @param n instance count <p/> domain: n is integer 440 * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS 441 * @throws Exception 442 */ 443 public static int ph2_coord_hoursInDay(int n) throws Exception { 444 int datasetFrequency = (int) getDSFrequency(); 445 // /Calendar nominalInstanceCal = 446 // getCurrentInstance(getActionCreationtime()); 447 Calendar nominalInstanceCal = getEffectiveNominalTime(); 448 if (nominalInstanceCal == null) { 449 return -1; 450 } 451 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 452 /* 453 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 454 * { return -1; } 455 */ 456 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 457 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 458 return DateUtils.hoursInDay(nominalInstanceCal); 459 } 460 461 public static int ph3_coord_hoursInDay(int n) throws Exception { 462 return ph2_coord_hoursInDay(n); 463 } 464 465 /** 466 * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency . 467 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. 468 * Data set initial instance <p/> 6. Action Creation Time 469 * 470 * @param n instance count. domain: n is integer 471 * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS 472 * @throws Exception 473 */ 474 public static int ph2_coord_daysInMonth(int n) throws Exception { 475 int datasetFrequency = (int) getDSFrequency();// in minutes 476 // Calendar nominalInstanceCal = 477 // getCurrentInstance(getActionCreationtime()); 478 Calendar nominalInstanceCal = getEffectiveNominalTime(); 479 if (nominalInstanceCal == null) { 480 return -1; 481 } 482 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 483 /* 484 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 485 * { return -1; } 486 */ 487 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 488 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 489 return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH); 490 } 491 492 public static int ph3_coord_daysInMonth(int n) throws Exception { 493 return ph2_coord_daysInMonth(n); 494 } 495 496 /** 497 * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p/> It depends 498 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST 499 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of 500 * dataset's directory 501 * 502 * @param n :instance count <p/> domain: n > 0, n is integer 503 * @return date-time in Oozie processing timezone of the n-th instance <p/> returns 'null' means n-th instance is 504 * earlier than Initial-Instance of DS 505 * @throws Exception 506 */ 507 public static String ph3_coord_latest(int n) throws Exception { 508 if (n > 0) { 509 throw new IllegalArgumentException("paramter should be <= 0 but it is " + n); 510 } 511 if (isSyncDataSet()) {// For Sync Dataset 512 return coord_latest_sync(n); 513 } 514 else { 515 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 516 } 517 } 518 519 /** 520 * Configure an evaluator with data set and application specific information. <p/> Helper method of associating 521 * dataset and application object 522 * 523 * @param evaluator : to set variables 524 * @param ds : Data Set object 525 * @param coordAction : Application instance 526 */ 527 public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) { 528 evaluator.setVariable(COORD_ACTION, coordAction); 529 evaluator.setVariable(DATASET, ds); 530 } 531 532 /** 533 * Helper method to wrap around with "${..}". <p/> 534 * 535 * 536 * @param eval :EL evaluator 537 * @param expr : expression to evaluate 538 * @return Resolved expression or echo back the same expression 539 * @throws Exception 540 */ 541 public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception { 542 try { 543 eval.setVariable(".wrap", null); 544 String result = eval.evaluate(expr, String.class); 545 if (eval.getVariable(".wrap") != null) { 546 return "${" + result + "}"; 547 } 548 else { 549 return result; 550 } 551 } 552 catch (Exception e) { 553 throw new Exception("Unable to evaluate :" + expr + ":\n", e); 554 } 555 } 556 557 // Set of echo functions 558 559 public static String ph1_coord_current_echo(String n) { 560 return echoUnResolved("current", n); 561 } 562 563 public static String ph2_coord_current_echo(String n) { 564 return echoUnResolved("current", n); 565 } 566 567 public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) { 568 return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit); 569 } 570 571 public static String ph1_coord_formatTime_echo(String dateTime, String format) { 572 // Quote the dateTime value since it would contain a ':'. 573 return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format); 574 } 575 576 public static String ph1_coord_latest_echo(String n) { 577 return echoUnResolved("latest", n); 578 } 579 580 public static String ph2_coord_latest_echo(String n) { 581 return ph1_coord_latest_echo(n); 582 } 583 584 public static String ph1_coord_future_echo(String n, String instance) { 585 return echoUnResolved("future", n + ", " + instance + ""); 586 } 587 588 public static String ph2_coord_future_echo(String n, String instance) { 589 return ph1_coord_future_echo(n, instance); 590 } 591 592 public static String ph1_coord_dataIn_echo(String n) { 593 ELEvaluator eval = ELEvaluator.getCurrent(); 594 String val = (String) eval.getVariable("oozie.dataname." + n); 595 if (val == null || val.equals("data-in") == false) { 596 XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid"); 597 throw new RuntimeException("data_in_name " + n + " is not valid"); 598 } 599 return echoUnResolved("dataIn", "'" + n + "'"); 600 } 601 602 public static String ph1_coord_dataOut_echo(String n) { 603 ELEvaluator eval = ELEvaluator.getCurrent(); 604 String val = (String) eval.getVariable("oozie.dataname." + n); 605 if (val == null || val.equals("data-out") == false) { 606 XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid"); 607 throw new RuntimeException("data_out_name " + n + " is not valid"); 608 } 609 return echoUnResolved("dataOut", "'" + n + "'"); 610 } 611 612 public static String ph1_coord_nominalTime_echo() { 613 return echoUnResolved("nominalTime", ""); 614 } 615 616 public static String ph1_coord_nominalTime_echo_wrap() { 617 // return "${coord:nominalTime()}"; // no resolution 618 return echoUnResolved("nominalTime", ""); 619 } 620 621 public static String ph1_coord_nominalTime_echo_fixed() { 622 return "2009-03-06T010:00"; // Dummy resolution 623 } 624 625 public static String ph1_coord_actualTime_echo_wrap() { 626 // return "${coord:actualTime()}"; // no resolution 627 return echoUnResolved("actualTime", ""); 628 } 629 630 public static String ph1_coord_actionId_echo() { 631 return echoUnResolved("actionId", ""); 632 } 633 634 public static String ph1_coord_name_echo() { 635 return echoUnResolved("name", ""); 636 } 637 638 // The following echo functions are not used in any phases yet 639 // They are here for future purpose. 640 public static String coord_minutes_echo(String n) { 641 return echoUnResolved("minutes", n); 642 } 643 644 public static String coord_hours_echo(String n) { 645 return echoUnResolved("hours", n); 646 } 647 648 public static String coord_days_echo(String n) { 649 return echoUnResolved("days", n); 650 } 651 652 public static String coord_endOfDay_echo(String n) { 653 return echoUnResolved("endOfDay", n); 654 } 655 656 public static String coord_months_echo(String n) { 657 return echoUnResolved("months", n); 658 } 659 660 public static String coord_endOfMonth_echo(String n) { 661 return echoUnResolved("endOfMonth", n); 662 } 663 664 public static String coord_actualTime_echo() { 665 return echoUnResolved("actualTime", ""); 666 } 667 668 // This echo function will always return "24" for validation only. 669 // This evaluation ****should not**** replace the original XML 670 // Create a temporary string and validate the function 671 // This is **required** for evaluating an expression like 672 // coord:HoursInDay(0) + 3 673 // actual evaluation will happen in phase 2 or phase 3. 674 public static String ph1_coord_hoursInDay_echo(String n) { 675 return "24"; 676 // return echoUnResolved("hoursInDay", n); 677 } 678 679 // This echo function will always return "30" for validation only. 680 // This evaluation ****should not**** replace the original XML 681 // Create a temporary string and validate the function 682 // This is **required** for evaluating an expression like 683 // coord:daysInMonth(0) + 3 684 // actual evaluation will happen in phase 2 or phase 3. 685 public static String ph1_coord_daysInMonth_echo(String n) { 686 // return echoUnResolved("daysInMonth", n); 687 return "30"; 688 } 689 690 // This echo function will always return "3" for validation only. 691 // This evaluation ****should not**** replace the original XML 692 // Create a temporary string and validate the function 693 // This is **required** for evaluating an expression like coord:tzOffset + 2 694 // actual evaluation will happen in phase 2 or phase 3. 695 public static String ph1_coord_tzOffset_echo() { 696 // return echoUnResolved("tzOffset", ""); 697 return "3"; 698 } 699 700 // Local methods 701 /** 702 * @param n 703 * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the 704 * Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset. 705 * @throws Exception 706 */ 707 private static String coord_current_sync(int n) throws Exception { 708 int datasetFrequency = getDSFrequency();// in minutes 709 TimeUnit dsTimeUnit = getDSTimeUnit(); 710 int[] instCount = new int[1];// used as pass by ref 711 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 712 if (nominalInstanceCal == null) { 713 XLog.getLog(CoordELFunctions.class) 714 .warn("If the initial instance of the dataset is later than the nominal time, an empty string is returned. This means that no data is available at the current-instance specified by the user and the user could try modifying his initial-instance to an earlier time."); 715 return ""; 716 } 717 nominalInstanceCal = getInitialInstanceCal(); 718 int absInstanceCount = instCount[0] + n; 719 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency * absInstanceCount); 720 721 if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) { 722 XLog.getLog(CoordELFunctions.class) 723 .warn("If the initial instance of the dataset is later than the current-instance specified, such as coord:current({0}) in this case, an empty string is returned. This means that no data is available at the current-instance specified by the user and the user could try modifying his initial-instance to an earlier time.", n); 724 return ""; 725 } 726 String str = DateUtils.formatDateOozieTZ(nominalInstanceCal); 727 return str; 728 } 729 730 /** 731 * @param offset 732 * @return n-th available latest instance Date-Time for SYNC data-set 733 * @throws Exception 734 */ 735 private static String coord_latest_sync(int offset) throws Exception { 736 if (offset > 0) { 737 throw new RuntimeException("For latest there is no meaning " + "of positive instance. n should be <=0" 738 + offset); 739 } 740 ELEvaluator eval = ELEvaluator.getCurrent(); 741 String retVal = ""; 742 int datasetFrequency = (int) getDSFrequency();// in minutes 743 TimeUnit dsTimeUnit = getDSTimeUnit(); 744 int[] instCount = new int[1]; 745 Calendar nominalInstanceCal = getCurrentInstance(getActualTime(), instCount); 746 if (nominalInstanceCal != null) { 747 Calendar initInstance = getInitialInstanceCal(); 748 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 749 if (ds == null) { 750 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 751 } 752 String uriTemplate = ds.getUriTemplate(); 753 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 754 if (conf == null) { 755 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 756 } 757 int available = 0; 758 boolean resolved = false; 759 String user = ParamChecker 760 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 761 String doneFlag = ds.getDoneFlag(); 762 while (nominalInstanceCal.compareTo(initInstance) >= 0) { 763 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 764 String uriPath = uriEval.evaluate(uriTemplate, String.class); 765 String pathWithDoneFlag = uriPath; 766 if (doneFlag.length() > 0) { 767 pathWithDoneFlag += "/" + doneFlag; 768 } 769 if (isPathAvailable(pathWithDoneFlag, user, null, conf)) { 770 XLog.getLog(CoordELFunctions.class).debug("Found latest(" + available + "): " + pathWithDoneFlag); 771 if (available == offset) { 772 XLog.getLog(CoordELFunctions.class).debug("Found Latest File: " + pathWithDoneFlag); 773 resolved = true; 774 retVal = DateUtils.formatDateOozieTZ(nominalInstanceCal); 775 eval.setVariable("resolved_path", uriPath); 776 break; 777 } 778 779 available--; 780 } 781 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), 782 // -datasetFrequency); 783 nominalInstanceCal = (Calendar) initInstance.clone(); 784 instCount[0]--; 785 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 786 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 787 } 788 if (!resolved) { 789 // return unchanged latest function with variable 'is_resolved' 790 // to 'false' 791 eval.setVariable("is_resolved", Boolean.FALSE); 792 retVal = "${coord:latest(" + offset + ")}"; 793 } 794 else { 795 eval.setVariable("is_resolved", Boolean.TRUE); 796 } 797 } 798 else {// No feasible nominal time 799 eval.setVariable("is_resolved", Boolean.FALSE); 800 } 801 return retVal; 802 } 803 804 // TODO : Not an efficient way. In a loop environment, we could do something 805 // outside the loop 806 /** 807 * Check whether a URI path exists 808 * 809 * @param sPath 810 * @param conf 811 * @return 812 * @throws IOException 813 */ 814 815 private static boolean isPathAvailable(String sPath, String user, String group, Configuration conf) 816 throws IOException, HadoopAccessorException { 817 // sPath += "/" + END_OF_OPERATION_INDICATOR_FILE; 818 Path path = new Path(sPath); 819 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 820 Configuration fsConf = has.createJobConf(path.toUri().getAuthority()); 821 return has.createFileSystem(user, path.toUri(), fsConf).exists(path); 822 } 823 824 /** 825 * @param tm 826 * @return a new Evaluator to be used for URI-template evaluation 827 */ 828 private static ELEvaluator getUriEvaluator(Calendar tm) { 829 ELEvaluator retEval = new ELEvaluator(); 830 retEval.setVariable("YEAR", tm.get(Calendar.YEAR)); 831 retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm 832 .get(Calendar.MONTH) + 1)); 833 retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm 834 .get(Calendar.DAY_OF_MONTH)); 835 retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm 836 .get(Calendar.HOUR_OF_DAY)); 837 retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm 838 .get(Calendar.MINUTE)); 839 return retEval; 840 } 841 842 /** 843 * @return whether a data set is SYNCH or ASYNC 844 */ 845 private static boolean isSyncDataSet() { 846 ELEvaluator eval = ELEvaluator.getCurrent(); 847 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 848 if (ds == null) { 849 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 850 } 851 return ds.getType().equalsIgnoreCase("SYNC"); 852 } 853 854 /** 855 * Check whether a function should be resolved. 856 * 857 * @param functionName 858 * @param n 859 * @return null if the functionName needs to be resolved otherwise return the calling function unresolved. 860 */ 861 private static String checkIfResolved(String functionName, String n) { 862 ELEvaluator eval = ELEvaluator.getCurrent(); 863 String replace = (String) eval.getVariable("resolve_" + functionName); 864 if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't 865 // resolve 866 // return "${coord:" + functionName + "(" + n +")}"; //Unresolved 867 eval.setVariable(".wrap", "true"); 868 return "coord:" + functionName + "(" + n + ")"; // Unresolved 869 } 870 return null; // Resolved it 871 } 872 873 private static String echoUnResolved(String functionName, String n) { 874 return echoUnResolvedPre(functionName, n, "coord:"); 875 } 876 877 private static String echoUnResolvedPre(String functionName, String n, String prefix) { 878 ELEvaluator eval = ELEvaluator.getCurrent(); 879 eval.setVariable(".wrap", "true"); 880 return prefix + functionName + "(" + n + ")"; // Unresolved 881 } 882 883 /** 884 * @return the initial instance of a DataSet in DATE 885 */ 886 private static Date getInitialInstance() { 887 return getInitialInstanceCal().getTime(); 888 // return ds.getInitInstance(); 889 } 890 891 /** 892 * @return the initial instance of a DataSet in Calendar 893 */ 894 private static Calendar getInitialInstanceCal() { 895 ELEvaluator eval = ELEvaluator.getCurrent(); 896 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 897 if (ds == null) { 898 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 899 } 900 Calendar effInitTS = Calendar.getInstance(); 901 effInitTS.setTime(ds.getInitInstance()); 902 effInitTS.setTimeZone(ds.getTimeZone()); 903 // To adjust EOD/EOM 904 DateUtils.moveToEnd(effInitTS, getDSEndOfFlag()); 905 return effInitTS; 906 // return ds.getInitInstance(); 907 } 908 909 /** 910 * @return Nominal or action creation Time when all the dependencies of an application instance are met. 911 */ 912 private static Date getActionCreationtime() { 913 ELEvaluator eval = ELEvaluator.getCurrent(); 914 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 915 if (coordAction == null) { 916 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 917 } 918 return coordAction.getNominalTime(); 919 } 920 921 /** 922 * @return Actual Time when all the dependencies of an application instance are met. 923 */ 924 private static Date getActualTime() { 925 ELEvaluator eval = ELEvaluator.getCurrent(); 926 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 927 if (coordAction == null) { 928 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 929 } 930 return coordAction.getActualTime(); 931 } 932 933 /** 934 * @return TimeZone for the application or job. 935 */ 936 private static TimeZone getJobTZ() { 937 ELEvaluator eval = ELEvaluator.getCurrent(); 938 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 939 if (coordAction == null) { 940 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 941 } 942 return coordAction.getTimeZone(); 943 } 944 945 /** 946 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 947 * 948 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 949 * the dataset. 950 */ 951 private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) { 952 Date datasetInitialInstance = getInitialInstance(); 953 TimeUnit dsTimeUnit = getDSTimeUnit(); 954 TimeZone dsTZ = getDatasetTZ(); 955 // Convert Date to Calendar for corresponding TZ 956 Calendar current = Calendar.getInstance(); 957 current.setTime(datasetInitialInstance); 958 current.setTimeZone(dsTZ); 959 960 Calendar calEffectiveTime = Calendar.getInstance(); 961 calEffectiveTime.setTime(effectiveTime); 962 calEffectiveTime.setTimeZone(dsTZ); 963 instanceCount[0] = 0; 964 if (current.compareTo(calEffectiveTime) > 0) { 965 // Nominal Time < initial Instance 966 // TODO: getClass() call doesn't work from static method. 967 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+ 968 // current.getTime()); 969 return null; 970 } 971 Calendar origCurrent = (Calendar) current.clone(); 972 while (current.compareTo(calEffectiveTime) <= 0) { 973 current = (Calendar) origCurrent.clone(); 974 instanceCount[0]++; 975 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * getDSFrequency()); 976 } 977 instanceCount[0]--; 978 979 current = (Calendar) origCurrent.clone(); 980 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * getDSFrequency()); 981 return current; 982 } 983 984 private static Calendar getEffectiveNominalTime() { 985 Date datasetInitialInstance = getInitialInstance(); 986 TimeZone dsTZ = getDatasetTZ(); 987 // Convert Date to Calendar for corresponding TZ 988 Calendar current = Calendar.getInstance(); 989 current.setTime(datasetInitialInstance); 990 current.setTimeZone(dsTZ); 991 992 Calendar calEffectiveTime = Calendar.getInstance(); 993 calEffectiveTime.setTime(getActionCreationtime()); 994 calEffectiveTime.setTimeZone(dsTZ); 995 if (current.compareTo(calEffectiveTime) > 0) { 996 // Nominal Time < initial Instance 997 // TODO: getClass() call doesn't work from static method. 998 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+ 999 // current.getTime()); 1000 return null; 1001 } 1002 return calEffectiveTime; 1003 } 1004 1005 /** 1006 * @return dataset frequency in minutes 1007 */ 1008 private static int getDSFrequency() { 1009 ELEvaluator eval = ELEvaluator.getCurrent(); 1010 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1011 if (ds == null) { 1012 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1013 } 1014 return ds.getFrequency(); 1015 } 1016 1017 /** 1018 * @return dataset TimeUnit 1019 */ 1020 private static TimeUnit getDSTimeUnit() { 1021 ELEvaluator eval = ELEvaluator.getCurrent(); 1022 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1023 if (ds == null) { 1024 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1025 } 1026 return ds.getTimeUnit(); 1027 } 1028 1029 /** 1030 * @return dataset TimeZone 1031 */ 1032 private static TimeZone getDatasetTZ() { 1033 ELEvaluator eval = ELEvaluator.getCurrent(); 1034 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1035 if (ds == null) { 1036 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1037 } 1038 return ds.getTimeZone(); 1039 } 1040 1041 /** 1042 * @return dataset TimeUnit 1043 */ 1044 private static TimeUnit getDSEndOfFlag() { 1045 ELEvaluator eval = ELEvaluator.getCurrent(); 1046 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1047 if (ds == null) { 1048 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1049 } 1050 return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration(); 1051 } 1052 1053 /** 1054 * Return a job configuration property for the coordinator. 1055 * 1056 * @param property property name. 1057 * @return the value of the property, <code>null</code> if the property is undefined. 1058 */ 1059 public static String coord_conf(String property) { 1060 ELEvaluator eval = ELEvaluator.getCurrent(); 1061 return (String) eval.getVariable(property); 1062 } 1063 1064 /** 1065 * Return the user that submitted the coordinator job. 1066 * 1067 * @return the user that submitted the coordinator job. 1068 */ 1069 public static String coord_user() { 1070 ELEvaluator eval = ELEvaluator.getCurrent(); 1071 return (String) eval.getVariable(OozieClient.USER_NAME); 1072 } 1073 }