001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.coord; 020 021import java.net.URI; 022import java.net.URISyntaxException; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.oozie.DagELFunctions; 026import org.apache.oozie.client.WorkflowJob; 027import org.apache.oozie.dependency.URIHandler; 028import org.apache.oozie.service.Services; 029import org.apache.oozie.service.URIHandlerService; 030import org.apache.oozie.util.ELEvaluator; 031import org.apache.oozie.util.HCatURI; 032import org.apache.oozie.util.XLog; 033 034/** 035 * This class implements the EL function for HCat datasets in coordinator 036 */ 037 038public class HCatELFunctions { 039 private static final Configuration EMPTY_CONF = new Configuration(true); 040 041 enum EventType { 042 input, output 043 } 044 045 /* Workflow Parameterization EL functions */ 046 047 /** 048 * Return true if partitions exists or false if not. 049 * 050 * @param uri hcatalog partition uri. 051 * @return <code>true</code> if the uri exists, <code>false</code> if it does not. 052 * @throws Exception 053 */ 054 public static boolean hcat_exists(String uri) throws Exception { 055 URI hcatURI = new URI(uri); 056 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 057 URIHandler handler = uriService.getURIHandler(hcatURI); 058 WorkflowJob workflow = DagELFunctions.getWorkflow(); 059 String user = workflow.getUser(); 060 return handler.exists(hcatURI, EMPTY_CONF, user); 061 } 062 063 /* Coord EL functions */ 064 065 /** 066 * Echo the same EL function without evaluating anything 067 * 068 * @param dataInName 069 * @return the same EL function 070 */ 071 public static String ph1_coord_databaseIn_echo(String dataInName) { 072 // Checking if the dataIn is correct? 073 isValidDataEvent(dataInName); 074 return echoUnResolved("databaseIn", "'" + dataInName + "'"); 075 } 076 077 public static String ph1_coord_databaseOut_echo(String dataName) { 078 // Checking if the dataOut is correct? 079 isValidDataEvent(dataName); 080 return echoUnResolved("databaseOut", "'" + dataName + "'"); 081 } 082 083 public static String ph1_coord_tableIn_echo(String dataName) { 084 // Checking if the dataIn is correct? 085 isValidDataEvent(dataName); 086 return echoUnResolved("tableIn", "'" + dataName + "'"); 087 } 088 089 public static String ph1_coord_tableOut_echo(String dataName) { 090 // Checking if the dataOut is correct? 091 isValidDataEvent(dataName); 092 return echoUnResolved("tableOut", "'" + dataName + "'"); 093 } 094 095 public static String ph1_coord_dataInPartitionFilter_echo(String dataInName, String type) { 096 // Checking if the dataIn/dataOut is correct? 097 isValidDataEvent(dataInName); 098 return echoUnResolved("dataInPartitionFilter", "'" + dataInName + "', '" + type + "'"); 099 } 100 101 public static String ph1_coord_dataInPartitionMin_echo(String dataInName, String partition) { 102 // Checking if the dataIn/dataOut is correct? 103 isValidDataEvent(dataInName); 104 return echoUnResolved("dataInPartitionMin", "'" + dataInName + "', '" + partition + "'"); 105 } 106 107 public static String ph1_coord_dataInPartitionMax_echo(String dataInName, String partition) { 108 // Checking if the dataIn/dataOut is correct? 109 isValidDataEvent(dataInName); 110 return echoUnResolved("dataInPartitionMax", "'" + dataInName + "', '" + partition + "'"); 111 } 112 113 public static String ph1_coord_dataOutPartitions_echo(String dataOutName) { 114 // Checking if the dataIn/dataOut is correct? 115 isValidDataEvent(dataOutName); 116 return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'"); 117 } 118 119 public static String ph1_coord_dataInPartitions_echo(String dataInName, String type) { 120 // Checking if the dataIn/dataOut is correct? 121 isValidDataEvent(dataInName); 122 return echoUnResolved("dataInPartitions", "'" + dataInName + "', '" + type + "'"); 123 } 124 125 public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) { 126 // Checking if the dataIn/dataOut is correct? 127 isValidDataEvent(dataOutName); 128 return echoUnResolved("dataOutPartitionValue", "'" + dataOutName + "', '" + partition + "'"); 129 } 130 131 /** 132 * Extract the hcat DB name from the URI-template associate with 133 * 'dataInName'. Caller needs to specify the EL-evaluator level variable 134 * 'oozie.coord.el.dataset.bean' with synchronous dataset object 135 * (SyncCoordDataset) 136 * 137 * @param dataInName 138 * @return DB name 139 */ 140 public static String ph3_coord_databaseIn(String dataInName) { 141 HCatURI hcatURI = getURIFromResolved(dataInName, EventType.input); 142 if (hcatURI != null) { 143 return hcatURI.getDb(); 144 } 145 else { 146 return ""; 147 } 148 } 149 150 /** 151 * Extract the hcat DB name from the URI-template associate with 152 * 'dataOutName'. Caller needs to specify the EL-evaluator level variable 153 * 'oozie.coord.el.dataset.bean' with synchronous dataset object 154 * (SyncCoordDataset) 155 * 156 * @param dataOutName 157 * @return DB name 158 */ 159 public static String ph3_coord_databaseOut(String dataOutName) { 160 HCatURI hcatURI = getURIFromResolved(dataOutName, EventType.output); 161 if (hcatURI != null) { 162 return hcatURI.getDb(); 163 } 164 else { 165 return ""; 166 } 167 } 168 169 /** 170 * Extract the hcat Table name from the URI-template associate with 171 * 'dataInName'. Caller needs to specify the EL-evaluator level variable 172 * 'oozie.coord.el.dataset.bean' with synchronous dataset object 173 * (SyncCoordDataset) 174 * 175 * @param dataInName 176 * @return Table name 177 */ 178 public static String ph3_coord_tableIn(String dataInName) { 179 HCatURI hcatURI = getURIFromResolved(dataInName, EventType.input); 180 if (hcatURI != null) { 181 return hcatURI.getTable(); 182 } 183 else { 184 return ""; 185 } 186 } 187 188 /** 189 * Extract the hcat Table name from the URI-template associate with 190 * 'dataOutName'. Caller needs to specify the EL-evaluator level variable 191 * 'oozie.coord.el.dataset.bean' with synchronous dataset object 192 * (SyncCoordDataset) 193 * 194 * @param dataOutName 195 * @return Table name 196 */ 197 public static String ph3_coord_tableOut(String dataOutName) { 198 HCatURI hcatURI = getURIFromResolved(dataOutName, EventType.output); 199 if (hcatURI != null) { 200 return hcatURI.getTable(); 201 } 202 else { 203 return ""; 204 } 205 } 206 207 /** 208 * Used to specify the HCat partition filter which is input dependency for workflow job.<p> Look for two evaluator-level 209 * variables <p> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p> A defines the current list of 210 * HCat URIs. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something 211 * unresolved, this function will echo back the original function <p> otherwise it sends the partition filter. 212 * 213 * @param dataInName : Datain name 214 * @param type : for action type - pig, MR or hive 215 */ 216 public static String ph3_coord_dataInPartitionFilter(String dataInName, String type) { 217 ELEvaluator eval = ELEvaluator.getCurrent(); 218 String uris = (String) eval.getVariable(".datain." + dataInName); 219 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 220 if (unresolved != null && unresolved.booleanValue() == true) { 221 return "${coord:dataInPartitionFilter('" + dataInName + "', '" + type + "')}"; 222 } 223 return createPartitionFilter(uris, type); 224 } 225 226 /** 227 * Used to specify the HCat partition's value defining output for workflow job.<p> Look for two evaluator-level 228 * variables <p> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p> A defines the current list of 229 * HCat URIs. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something 230 * unresolved, this function will echo back the original function <p> otherwise it sends the partition value. 231 * 232 * @param dataOutName : Dataout name 233 * @param partitionName : Specific partition name whose value is wanted 234 */ 235 public static String ph3_coord_dataOutPartitionValue(String dataOutName, String partitionName) { 236 ELEvaluator eval = ELEvaluator.getCurrent(); 237 String uri = (String) eval.getVariable(".dataout." + dataOutName); 238 Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved"); 239 if (unresolved != null && unresolved.booleanValue() == true) { 240 return "${coord:dataOutPartitionValue('" + dataOutName + "', '" + partitionName + "')}"; 241 } 242 try { 243 HCatURI hcatUri = new HCatURI(uri); 244 return hcatUri.getPartitionValue(partitionName); 245 } 246 catch(URISyntaxException urie) { 247 XLog.getLog(HCatELFunctions.class).warn("Exception with uriTemplate [{0}]. Reason [{1}]: ", uri, urie); 248 throw new RuntimeException("HCat URI can't be parsed " + urie); 249 } 250 } 251 252 /** 253 * Used to specify the entire HCat partition defining output for workflow job.<p> Look for two evaluator-level 254 * variables <p> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p> A defines the data-out 255 * HCat URI. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something 256 * unresolved, this function will echo back the original function <p> otherwise it sends the partition. 257 * 258 * @param dataOutName : DataOut name 259 */ 260 public static String ph3_coord_dataOutPartitions(String dataOutName) { 261 ELEvaluator eval = ELEvaluator.getCurrent(); 262 String uri = (String) eval.getVariable(".dataout." + dataOutName); 263 Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved"); 264 if (unresolved != null && unresolved.booleanValue() == true) { 265 return "${coord:dataOutPartitions('" + dataOutName + "')}"; 266 } 267 try { 268 return new HCatURI(uri).toPartitionString(); 269 } 270 catch (URISyntaxException e) { 271 throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e); 272 } 273 } 274 275 /** 276 * Used to specify the entire HCat partition defining input for workflow job. <p> Look for two evaluator-level 277 * variables <p> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p> A defines the data-in HCat URI. 278 * <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something unresolved, 279 * this function will echo back the original function <p> otherwise it sends the partition. 280 * 281 * @param dataInName : DataIn name 282 * @param type : for action type: hive-export 283 */ 284 public static String ph3_coord_dataInPartitions(String dataInName, String type) { 285 ELEvaluator eval = ELEvaluator.getCurrent(); 286 String uri = (String) eval.getVariable(".datain." + dataInName); 287 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 288 if (unresolved != null && unresolved.booleanValue() == true) { 289 return "${coord:dataInPartitions('" + dataInName + "', '" + type + "')}"; 290 } 291 String partitionValue = null; 292 if (uri != null) { 293 if (type.equals("hive-export")) { 294 String[] uriList = uri.split(CoordELFunctions.DIR_SEPARATOR); 295 if (uriList.length > 1) { 296 throw new RuntimeException("Multiple partitions not supported for hive-export type. Dataset name: " 297 + dataInName + " URI: " + uri); 298 } 299 try { 300 partitionValue = new HCatURI(uri).toPartitionValueString(type); 301 } 302 catch (URISyntaxException e) { 303 throw new RuntimeException("Parsing exception for HCatURI " + uri, e); 304 } 305 } else { 306 throw new RuntimeException("Unsupported type: " + type + " dataset name: " + dataInName); 307 } 308 } 309 else { 310 XLog.getLog(HCatELFunctions.class).warn("URI is null"); 311 return null; 312 } 313 return partitionValue; 314 } 315 316 /** 317 * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p> Look for two evaluator-level 318 * variables <p> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p> A defines the current list of 319 * HCat URIs. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something 320 * unresolved, this function will echo back the original function <p> otherwise it sends the max partition value. 321 * 322 * @param dataInName : Datain name 323 * @param partitionName : Specific partition name whose MAX value is wanted 324 */ 325 public static String ph3_coord_dataInPartitionMin(String dataInName, String partitionName) { 326 ELEvaluator eval = ELEvaluator.getCurrent(); 327 String uris = (String) eval.getVariable(".datain." + dataInName); 328 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 329 if (unresolved != null && unresolved.booleanValue() == true) { 330 return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}"; 331 } 332 String minPartition = null; 333 if (uris != null) { 334 String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR); 335 // get the partition values list and find minimum 336 try { 337 // initialize minValue with first partition value 338 minPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName); 339 if (minPartition == null || minPartition.isEmpty()) { 340 throw new RuntimeException("No value in data-in uri for partition key: " + partitionName); 341 } 342 for (int i = 1; i < uriList.length; i++) { 343 String value = new HCatURI(uriList[i]).getPartitionValue(partitionName); 344 if(value.compareTo(minPartition) < 0) { //sticking to string comparison since some numerical date 345 //values can also contain letters e.g. 20120101T0300Z (UTC) 346 minPartition = value; 347 } 348 } 349 } 350 catch(URISyntaxException urie) { 351 throw new RuntimeException("HCat URI can't be parsed " + urie); 352 } 353 } 354 else { 355 XLog.getLog(HCatELFunctions.class).warn("URI is null"); 356 return null; 357 } 358 return minPartition; 359 } 360 361 /** 362 * Used to specify the MINIMUM value of an HCat partition which is input dependency for workflow job.<p> Look for two evaluator-level 363 * variables <p> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p> A defines the current list of 364 * HCat URIs. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something 365 * unresolved, this function will echo back the original function <p> otherwise it sends the min partition value. 366 * 367 * @param dataInName : Datain name 368 * @param partitionName : Specific partition name whose MIN value is wanted 369 */ 370 public static String ph3_coord_dataInPartitionMax(String dataInName, String partitionName) { 371 ELEvaluator eval = ELEvaluator.getCurrent(); 372 String uris = (String) eval.getVariable(".datain." + dataInName); 373 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 374 if (unresolved != null && unresolved.booleanValue() == true) { 375 return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}"; 376 } 377 String maxPartition = null; 378 if (uris != null) { 379 String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR); 380 // get the partition values list and find minimum 381 try { 382 // initialize minValue with first partition value 383 maxPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName); 384 if (maxPartition == null || maxPartition.isEmpty()) { 385 throw new RuntimeException("No value in data-in uri for partition key: " + partitionName); 386 } 387 for(int i = 1; i < uriList.length; i++) { 388 String value = new HCatURI(uriList[i]).getPartitionValue(partitionName); 389 if(value.compareTo(maxPartition) > 0) { 390 maxPartition = value; 391 } 392 } 393 } 394 catch(URISyntaxException urie) { 395 throw new RuntimeException("HCat URI can't be parsed " + urie); 396 } 397 } 398 else { 399 XLog.getLog(HCatELFunctions.class).warn("URI is null"); 400 return null; 401 } 402 return maxPartition; 403 } 404 405 private static String createPartitionFilter(String uris, String type) { 406 String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR); 407 StringBuilder filter = new StringBuilder(""); 408 if (uriList.length > 0) { 409 for (String uri : uriList) { 410 if (filter.length() > 0) { 411 filter.append(" OR "); 412 } 413 try { 414 filter.append(new HCatURI(uri).toPartitionFilter(type)); 415 } 416 catch (URISyntaxException e) { 417 throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e); 418 } 419 } 420 } 421 return filter.toString(); 422 } 423 424 private static HCatURI getURIFromResolved(String dataInName, EventType type) { 425 final XLog LOG = XLog.getLog(HCatELFunctions.class); 426 StringBuilder uriTemplate = new StringBuilder(); 427 ELEvaluator eval = ELEvaluator.getCurrent(); 428 String uris; 429 if(type == EventType.input) { 430 uris = (String) eval.getVariable(".datain." + dataInName); 431 } 432 else { //type=output 433 uris = (String) eval.getVariable(".dataout." + dataInName); 434 } 435 if (uris != null) { 436 String[] uri = uris.split(CoordELFunctions.DIR_SEPARATOR, -1); 437 uriTemplate.append(uri[0]); 438 } 439 else { 440 LOG.warn("URI is NULL"); 441 return null; 442 } 443 LOG.info("uriTemplate [{0}] ", uriTemplate); 444 HCatURI hcatURI; 445 try { 446 hcatURI = new HCatURI(uriTemplate.toString()); 447 } 448 catch (URISyntaxException e) { 449 LOG.info("uriTemplate [{0}]. Reason [{1}]: ", uriTemplate, e); 450 throw new RuntimeException("HCat URI can't be parsed " + e); 451 } 452 return hcatURI; 453 } 454 455 private static boolean isValidDataEvent(String dataInName) { 456 ELEvaluator eval = ELEvaluator.getCurrent(); 457 String val = (String) eval.getVariable("oozie.dataname." + dataInName); 458 if (val == null || (val.equals("data-in") == false && val.equals("data-out") == false)) { 459 XLog.getLog(HCatELFunctions.class).error("dataset name " + dataInName + " is not valid. val :" + val); 460 throw new RuntimeException("data set name " + dataInName + " is not valid"); 461 } 462 return true; 463 } 464 465 private static String echoUnResolved(String functionName, String n) { 466 return echoUnResolvedPre(functionName, n, "coord:"); 467 } 468 469 private static String echoUnResolvedPre(String functionName, String n, String prefix) { 470 ELEvaluator eval = ELEvaluator.getCurrent(); 471 eval.setVariable(".wrap", "true"); 472 return prefix + functionName + "(" + n + ")"; // Unresolved 473 } 474 475}