001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.coord; 019 020import java.net.URI; 021import java.net.URISyntaxException; 022 023import org.apache.hadoop.conf.Configuration; 024import org.apache.oozie.DagELFunctions; 025import org.apache.oozie.client.WorkflowJob; 026import org.apache.oozie.dependency.URIHandler; 027import org.apache.oozie.service.Services; 028import org.apache.oozie.service.URIHandlerService; 029import org.apache.oozie.util.ELEvaluator; 030import org.apache.oozie.util.HCatURI; 031import org.apache.oozie.util.XLog; 032 033/** 034 * This class implements the EL function for HCat datasets in coordinator 035 */ 036 037public class HCatELFunctions { 038 private static final Configuration EMPTY_CONF = new Configuration(true); 039 040 enum EventType { 041 input, output 042 } 043 044 /* Workflow Parameterization EL functions */ 045 046 /** 047 * Return true if partitions exists or false if not. 048 * 049 * @param uri hcatalog partition uri. 050 * @return <code>true</code> if the uri exists, <code>false</code> if it does not. 051 * @throws Exception 052 */ 053 public static boolean hcat_exists(String uri) throws Exception { 054 URI hcatURI = new URI(uri); 055 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 056 URIHandler handler = uriService.getURIHandler(hcatURI); 057 WorkflowJob workflow = DagELFunctions.getWorkflow(); 058 String user = workflow.getUser(); 059 return handler.exists(hcatURI, EMPTY_CONF, user); 060 } 061 062 /* Coord EL functions */ 063 064 /** 065 * Echo the same EL function without evaluating anything 066 * 067 * @param dataInName 068 * @return the same EL function 069 */ 070 public static String ph1_coord_databaseIn_echo(String dataName) { 071 // Checking if the dataIn is correct? 072 isValidDataEvent(dataName); 073 return echoUnResolved("databaseIn", "'" + dataName + "'"); 074 } 075 076 public static String ph1_coord_databaseOut_echo(String dataName) { 077 // Checking if the dataOut is correct? 078 isValidDataEvent(dataName); 079 return echoUnResolved("databaseOut", "'" + dataName + "'"); 080 } 081 082 public static String ph1_coord_tableIn_echo(String dataName) { 083 // Checking if the dataIn is correct? 084 isValidDataEvent(dataName); 085 return echoUnResolved("tableIn", "'" + dataName + "'"); 086 } 087 088 public static String ph1_coord_tableOut_echo(String dataName) { 089 // Checking if the dataOut is correct? 090 isValidDataEvent(dataName); 091 return echoUnResolved("tableOut", "'" + dataName + "'"); 092 } 093 094 public static String ph1_coord_dataInPartitionFilter_echo(String dataInName, String type) { 095 // Checking if the dataIn/dataOut is correct? 096 isValidDataEvent(dataInName); 097 return echoUnResolved("dataInPartitionFilter", "'" + dataInName + "', '" + type + "'"); 098 } 099 100 public static String ph1_coord_dataInPartitionMin_echo(String dataInName, String partition) { 101 // Checking if the dataIn/dataOut is correct? 102 isValidDataEvent(dataInName); 103 return echoUnResolved("dataInPartitionMin", "'" + dataInName + "', '" + partition + "'"); 104 } 105 106 public static String ph1_coord_dataInPartitionMax_echo(String dataInName, String partition) { 107 // Checking if the dataIn/dataOut is correct? 108 isValidDataEvent(dataInName); 109 return echoUnResolved("dataInPartitionMax", "'" + dataInName + "', '" + partition + "'"); 110 } 111 112 public static String ph1_coord_dataOutPartitions_echo(String dataOutName) { 113 // Checking if the dataIn/dataOut is correct? 114 isValidDataEvent(dataOutName); 115 return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'"); 116 } 117 118 public static String ph1_coord_dataInPartitions_echo(String dataInName, String type) { 119 // Checking if the dataIn/dataOut is correct? 120 isValidDataEvent(dataInName); 121 return echoUnResolved("dataInPartitions", "'" + dataInName + "', '" + type + "'"); 122 } 123 124 public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) { 125 // Checking if the dataIn/dataOut is correct? 126 isValidDataEvent(dataOutName); 127 return echoUnResolved("dataOutPartitionValue", "'" + dataOutName + "', '" + partition + "'"); 128 } 129 130 /** 131 * Extract the hcat DB name from the URI-template associate with 132 * 'dataInName'. Caller needs to specify the EL-evaluator level variable 133 * 'oozie.coord.el.dataset.bean' with synchronous dataset object 134 * (SyncCoordDataset) 135 * 136 * @param dataInName 137 * @return DB name 138 */ 139 public static String ph3_coord_databaseIn(String dataName) { 140 HCatURI hcatURI = getURIFromResolved(dataName, EventType.input); 141 if (hcatURI != null) { 142 return hcatURI.getDb(); 143 } 144 else { 145 return ""; 146 } 147 } 148 149 /** 150 * Extract the hcat DB name from the URI-template associate with 151 * 'dataOutName'. Caller needs to specify the EL-evaluator level variable 152 * 'oozie.coord.el.dataset.bean' with synchronous dataset object 153 * (SyncCoordDataset) 154 * 155 * @param dataOutName 156 * @return DB name 157 */ 158 public static String ph3_coord_databaseOut(String dataName) { 159 HCatURI hcatURI = getURIFromResolved(dataName, EventType.output); 160 if (hcatURI != null) { 161 return hcatURI.getDb(); 162 } 163 else { 164 return ""; 165 } 166 } 167 168 /** 169 * Extract the hcat Table name from the URI-template associate with 170 * 'dataInName'. Caller needs to specify the EL-evaluator level variable 171 * 'oozie.coord.el.dataset.bean' with synchronous dataset object 172 * (SyncCoordDataset) 173 * 174 * @param dataInName 175 * @return Table name 176 */ 177 public static String ph3_coord_tableIn(String dataName) { 178 HCatURI hcatURI = getURIFromResolved(dataName, EventType.input); 179 if (hcatURI != null) { 180 return hcatURI.getTable(); 181 } 182 else { 183 return ""; 184 } 185 } 186 187 /** 188 * Extract the hcat Table name from the URI-template associate with 189 * 'dataOutName'. Caller needs to specify the EL-evaluator level variable 190 * 'oozie.coord.el.dataset.bean' with synchronous dataset object 191 * (SyncCoordDataset) 192 * 193 * @param dataOutName 194 * @return Table name 195 */ 196 public static String ph3_coord_tableOut(String dataName) { 197 HCatURI hcatURI = getURIFromResolved(dataName, EventType.output); 198 if (hcatURI != null) { 199 return hcatURI.getTable(); 200 } 201 else { 202 return ""; 203 } 204 } 205 206 /** 207 * Used to specify the HCat partition filter which is input dependency for workflow job.<p/> Look for two evaluator-level 208 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of 209 * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 210 * unresolved, this function will echo back the original function <p/> otherwise it sends the partition filter. 211 * 212 * @param dataInName : Datain name 213 * @param type : for action type - pig, MR or hive 214 */ 215 public static String ph3_coord_dataInPartitionFilter(String dataInName, String type) { 216 ELEvaluator eval = ELEvaluator.getCurrent(); 217 String uris = (String) eval.getVariable(".datain." + dataInName); 218 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 219 if (unresolved != null && unresolved.booleanValue() == true) { 220 return "${coord:dataInPartitionFilter('" + dataInName + "', '" + type + "')}"; 221 } 222 return createPartitionFilter(uris, type); 223 } 224 225 /** 226 * Used to specify the HCat partition's value defining output for workflow job.<p/> Look for two evaluator-level 227 * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the current list of 228 * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 229 * unresolved, this function will echo back the original function <p/> otherwise it sends the partition value. 230 * 231 * @param dataOutName : Dataout name 232 * @param partitionName : Specific partition name whose value is wanted 233 */ 234 public static String ph3_coord_dataOutPartitionValue(String dataOutName, String partitionName) { 235 ELEvaluator eval = ELEvaluator.getCurrent(); 236 String uri = (String) eval.getVariable(".dataout." + dataOutName); 237 Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved"); 238 if (unresolved != null && unresolved.booleanValue() == true) { 239 return "${coord:dataOutPartitionValue('" + dataOutName + "', '" + partitionName + "')}"; 240 } 241 try { 242 HCatURI hcatUri = new HCatURI(uri); 243 return hcatUri.getPartitionValue(partitionName); 244 } 245 catch(URISyntaxException urie) { 246 XLog.getLog(HCatELFunctions.class).warn("Exception with uriTemplate [{0}]. Reason [{1}]: ", uri, urie); 247 throw new RuntimeException("HCat URI can't be parsed " + urie); 248 } 249 } 250 251 /** 252 * Used to specify the entire HCat partition defining output for workflow job.<p/> Look for two evaluator-level 253 * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the data-out 254 * HCat URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 255 * unresolved, this function will echo back the original function <p/> otherwise it sends the partition. 256 * 257 * @param dataOutName : DataOut name 258 */ 259 public static String ph3_coord_dataOutPartitions(String dataOutName) { 260 ELEvaluator eval = ELEvaluator.getCurrent(); 261 String uri = (String) eval.getVariable(".dataout." + dataOutName); 262 Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved"); 263 if (unresolved != null && unresolved.booleanValue() == true) { 264 return "${coord:dataOutPartitions('" + dataOutName + "')}"; 265 } 266 try { 267 return new HCatURI(uri).toPartitionString(); 268 } 269 catch (URISyntaxException e) { 270 throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e); 271 } 272 } 273 274 /** 275 * Used to specify the entire HCat partition defining input for workflow job. <p/> Look for two evaluator-level 276 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the data-in HCat URI. 277 * <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something unresolved, 278 * this function will echo back the original function <p/> otherwise it sends the partition. 279 * 280 * @param dataInName : DataIn name 281 * @param type : for action type: hive-export 282 */ 283 public static String ph3_coord_dataInPartitions(String dataInName, String type) { 284 ELEvaluator eval = ELEvaluator.getCurrent(); 285 String uri = (String) eval.getVariable(".datain." + dataInName); 286 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 287 if (unresolved != null && unresolved.booleanValue() == true) { 288 return "${coord:dataInPartitions('" + dataInName + "', '" + type + "')}"; 289 } 290 String partitionValue = null; 291 if (uri != null) { 292 if (type.equals("hive-export")) { 293 String[] uriList = uri.split(CoordELFunctions.DIR_SEPARATOR); 294 if (uriList.length > 1) { 295 throw new RuntimeException("Multiple partitions not supported for hive-export type. Dataset name: " 296 + dataInName + " URI: " + uri); 297 } 298 try { 299 partitionValue = new HCatURI(uri).toPartitionValueString(type); 300 } 301 catch (URISyntaxException e) { 302 throw new RuntimeException("Parsing exception for HCatURI " + uri, e); 303 } 304 } else { 305 throw new RuntimeException("Unsupported type: " + type + " dataset name: " + dataInName); 306 } 307 } 308 else { 309 XLog.getLog(HCatELFunctions.class).warn("URI is null"); 310 return null; 311 } 312 return partitionValue; 313 } 314 315 /** 316 * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level 317 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of 318 * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 319 * unresolved, this function will echo back the original function <p/> otherwise it sends the max partition value. 320 * 321 * @param dataInName : Datain name 322 * @param partitionName : Specific partition name whose MAX value is wanted 323 */ 324 public static String ph3_coord_dataInPartitionMin(String dataInName, String partitionName) { 325 ELEvaluator eval = ELEvaluator.getCurrent(); 326 String uris = (String) eval.getVariable(".datain." + dataInName); 327 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 328 if (unresolved != null && unresolved.booleanValue() == true) { 329 return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}"; 330 } 331 String minPartition = null; 332 if (uris != null) { 333 String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR); 334 // get the partition values list and find minimum 335 try { 336 // initialize minValue with first partition value 337 minPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName); 338 if (minPartition == null || minPartition.isEmpty()) { 339 throw new RuntimeException("No value in data-in uri for partition key: " + partitionName); 340 } 341 for (int i = 1; i < uriList.length; i++) { 342 String value = new HCatURI(uriList[i]).getPartitionValue(partitionName); 343 if(value.compareTo(minPartition) < 0) { //sticking to string comparison since some numerical date 344 //values can also contain letters e.g. 20120101T0300Z (UTC) 345 minPartition = value; 346 } 347 } 348 } 349 catch(URISyntaxException urie) { 350 throw new RuntimeException("HCat URI can't be parsed " + urie); 351 } 352 } 353 else { 354 XLog.getLog(HCatELFunctions.class).warn("URI is null"); 355 return null; 356 } 357 return minPartition; 358 } 359 360 /** 361 * Used to specify the MINIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level 362 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of 363 * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 364 * unresolved, this function will echo back the original function <p/> otherwise it sends the min partition value. 365 * 366 * @param dataInName : Datain name 367 * @param partitionName : Specific partition name whose MIN value is wanted 368 */ 369 public static String ph3_coord_dataInPartitionMax(String dataInName, String partitionName) { 370 ELEvaluator eval = ELEvaluator.getCurrent(); 371 String uris = (String) eval.getVariable(".datain." + dataInName); 372 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 373 if (unresolved != null && unresolved.booleanValue() == true) { 374 return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}"; 375 } 376 String maxPartition = null; 377 if (uris != null) { 378 String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR); 379 // get the partition values list and find minimum 380 try { 381 // initialize minValue with first partition value 382 maxPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName); 383 if (maxPartition == null || maxPartition.isEmpty()) { 384 throw new RuntimeException("No value in data-in uri for partition key: " + partitionName); 385 } 386 for(int i = 1; i < uriList.length; i++) { 387 String value = new HCatURI(uriList[i]).getPartitionValue(partitionName); 388 if(value.compareTo(maxPartition) > 0) { 389 maxPartition = value; 390 } 391 } 392 } 393 catch(URISyntaxException urie) { 394 throw new RuntimeException("HCat URI can't be parsed " + urie); 395 } 396 } 397 else { 398 XLog.getLog(HCatELFunctions.class).warn("URI is null"); 399 return null; 400 } 401 return maxPartition; 402 } 403 404 private static String createPartitionFilter(String uris, String type) { 405 String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR); 406 StringBuilder filter = new StringBuilder(""); 407 if (uriList.length > 0) { 408 for (String uri : uriList) { 409 if (filter.length() > 0) { 410 filter.append(" OR "); 411 } 412 try { 413 filter.append(new HCatURI(uri).toPartitionFilter(type)); 414 } 415 catch (URISyntaxException e) { 416 throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e); 417 } 418 } 419 } 420 return filter.toString(); 421 } 422 423 private static HCatURI getURIFromResolved(String dataInName, EventType type) { 424 final XLog LOG = XLog.getLog(HCatELFunctions.class); 425 StringBuilder uriTemplate = new StringBuilder(); 426 ELEvaluator eval = ELEvaluator.getCurrent(); 427 String uris; 428 if(type == EventType.input) { 429 uris = (String) eval.getVariable(".datain." + dataInName); 430 } 431 else { //type=output 432 uris = (String) eval.getVariable(".dataout." + dataInName); 433 } 434 if (uris != null) { 435 String[] uri = uris.split(CoordELFunctions.DIR_SEPARATOR, -1); 436 uriTemplate.append(uri[0]); 437 } 438 else { 439 LOG.warn("URI is NULL"); 440 return null; 441 } 442 LOG.info("uriTemplate [{0}] ", uriTemplate); 443 HCatURI hcatURI; 444 try { 445 hcatURI = new HCatURI(uriTemplate.toString()); 446 } 447 catch (URISyntaxException e) { 448 LOG.info("uriTemplate [{0}]. Reason [{1}]: ", uriTemplate, e); 449 throw new RuntimeException("HCat URI can't be parsed " + e); 450 } 451 return hcatURI; 452 } 453 454 private static boolean isValidDataEvent(String dataInName) { 455 ELEvaluator eval = ELEvaluator.getCurrent(); 456 String val = (String) eval.getVariable("oozie.dataname." + dataInName); 457 if (val == null || (val.equals("data-in") == false && val.equals("data-out") == false)) { 458 XLog.getLog(HCatELFunctions.class).error("dataset name " + dataInName + " is not valid. val :" + val); 459 throw new RuntimeException("data set name " + dataInName + " is not valid"); 460 } 461 return true; 462 } 463 464 private static String echoUnResolved(String functionName, String n) { 465 return echoUnResolvedPre(functionName, n, "coord:"); 466 } 467 468 private static String echoUnResolvedPre(String functionName, String n, String prefix) { 469 ELEvaluator eval = ELEvaluator.getCurrent(); 470 eval.setVariable(".wrap", "true"); 471 return prefix + functionName + "(" + n + ")"; // Unresolved 472 } 473 474}