001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.coord; 019 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 023 import org.apache.hadoop.conf.Configuration; 024 import org.apache.oozie.DagELFunctions; 025 import org.apache.oozie.client.WorkflowJob; 026 import org.apache.oozie.dependency.URIHandler; 027 import org.apache.oozie.service.Services; 028 import org.apache.oozie.service.URIHandlerService; 029 import org.apache.oozie.util.ELEvaluator; 030 import org.apache.oozie.util.HCatURI; 031 import org.apache.oozie.util.XLog; 032 033 /** 034 * This class implements the EL function for HCat datasets in coordinator 035 */ 036 037 public class HCatELFunctions { 038 private static final Configuration EMPTY_CONF = new Configuration(true); 039 040 enum EventType { 041 input, output 042 } 043 044 /* Workflow Parameterization EL functions */ 045 046 /** 047 * Return true if partitions exists or false if not. 048 * 049 * @param uri hcatalog partition uri. 050 * @return <code>true</code> if the uri exists, <code>false</code> if it does not. 051 * @throws Exception 052 */ 053 public static boolean hcat_exists(String uri) throws Exception { 054 URI hcatURI = new URI(uri); 055 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 056 URIHandler handler = uriService.getURIHandler(hcatURI); 057 WorkflowJob workflow = DagELFunctions.getWorkflow(); 058 String user = workflow.getUser(); 059 return handler.exists(hcatURI, EMPTY_CONF, user); 060 } 061 062 /* Coord EL functions */ 063 064 /** 065 * Echo the same EL function without evaluating anything 066 * 067 * @param dataInName 068 * @return the same EL function 069 */ 070 public static String ph1_coord_databaseIn_echo(String dataName) { 071 // Checking if the dataIn is correct? 072 isValidDataEvent(dataName); 073 return echoUnResolved("databaseIn", "'" + dataName + "'"); 074 } 075 076 public static String ph1_coord_databaseOut_echo(String dataName) { 077 // Checking if the dataOut is correct? 078 isValidDataEvent(dataName); 079 return echoUnResolved("databaseOut", "'" + dataName + "'"); 080 } 081 082 public static String ph1_coord_tableIn_echo(String dataName) { 083 // Checking if the dataIn is correct? 084 isValidDataEvent(dataName); 085 return echoUnResolved("tableIn", "'" + dataName + "'"); 086 } 087 088 public static String ph1_coord_tableOut_echo(String dataName) { 089 // Checking if the dataOut is correct? 090 isValidDataEvent(dataName); 091 return echoUnResolved("tableOut", "'" + dataName + "'"); 092 } 093 094 public static String ph1_coord_dataInPartitionFilter_echo(String dataInName, String type) { 095 // Checking if the dataIn/dataOut is correct? 096 isValidDataEvent(dataInName); 097 return echoUnResolved("dataInPartitionFilter", "'" + dataInName + "', '" + type + "'"); 098 } 099 100 public static String ph1_coord_dataInPartitionMin_echo(String dataInName, String partition) { 101 // Checking if the dataIn/dataOut is correct? 102 isValidDataEvent(dataInName); 103 return echoUnResolved("dataInPartitionMin", "'" + dataInName + "', '" + partition + "'"); 104 } 105 106 public static String ph1_coord_dataInPartitionMax_echo(String dataInName, String partition) { 107 // Checking if the dataIn/dataOut is correct? 108 isValidDataEvent(dataInName); 109 return echoUnResolved("dataInPartitionMax", "'" + dataInName + "', '" + partition + "'"); 110 } 111 112 public static String ph1_coord_dataOutPartitions_echo(String dataOutName) { 113 // Checking if the dataIn/dataOut is correct? 114 isValidDataEvent(dataOutName); 115 return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'"); 116 } 117 118 public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) { 119 // Checking if the dataIn/dataOut is correct? 120 isValidDataEvent(dataOutName); 121 return echoUnResolved("dataOutPartitionValue", "'" + dataOutName + "', '" + partition + "'"); 122 } 123 124 /** 125 * Extract the hcat DB name from the URI-template associate with 126 * 'dataInName'. Caller needs to specify the EL-evaluator level variable 127 * 'oozie.coord.el.dataset.bean' with synchronous dataset object 128 * (SyncCoordDataset) 129 * 130 * @param dataInName 131 * @return DB name 132 */ 133 public static String ph3_coord_databaseIn(String dataName) { 134 HCatURI hcatURI = getURIFromResolved(dataName, EventType.input); 135 if (hcatURI != null) { 136 return hcatURI.getDb(); 137 } 138 else { 139 return ""; 140 } 141 } 142 143 /** 144 * Extract the hcat DB name from the URI-template associate with 145 * 'dataOutName'. Caller needs to specify the EL-evaluator level variable 146 * 'oozie.coord.el.dataset.bean' with synchronous dataset object 147 * (SyncCoordDataset) 148 * 149 * @param dataOutName 150 * @return DB name 151 */ 152 public static String ph3_coord_databaseOut(String dataName) { 153 HCatURI hcatURI = getURIFromResolved(dataName, EventType.output); 154 if (hcatURI != null) { 155 return hcatURI.getDb(); 156 } 157 else { 158 return ""; 159 } 160 } 161 162 /** 163 * Extract the hcat Table name from the URI-template associate with 164 * 'dataInName'. Caller needs to specify the EL-evaluator level variable 165 * 'oozie.coord.el.dataset.bean' with synchronous dataset object 166 * (SyncCoordDataset) 167 * 168 * @param dataInName 169 * @return Table name 170 */ 171 public static String ph3_coord_tableIn(String dataName) { 172 HCatURI hcatURI = getURIFromResolved(dataName, EventType.input); 173 if (hcatURI != null) { 174 return hcatURI.getTable(); 175 } 176 else { 177 return ""; 178 } 179 } 180 181 /** 182 * Extract the hcat Table name from the URI-template associate with 183 * 'dataOutName'. Caller needs to specify the EL-evaluator level variable 184 * 'oozie.coord.el.dataset.bean' with synchronous dataset object 185 * (SyncCoordDataset) 186 * 187 * @param dataOutName 188 * @return Table name 189 */ 190 public static String ph3_coord_tableOut(String dataName) { 191 HCatURI hcatURI = getURIFromResolved(dataName, EventType.output); 192 if (hcatURI != null) { 193 return hcatURI.getTable(); 194 } 195 else { 196 return ""; 197 } 198 } 199 200 /** 201 * Used to specify the HCat partition filter which is input dependency for workflow job.<p/> Look for two evaluator-level 202 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of 203 * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 204 * unresolved, this function will echo back the original function <p/> otherwise it sends the partition filter. 205 * 206 * @param dataInName : Datain name 207 * @param type : for action type - pig, MR or hive 208 */ 209 public static String ph3_coord_dataInPartitionFilter(String dataInName, String type) { 210 ELEvaluator eval = ELEvaluator.getCurrent(); 211 String uris = (String) eval.getVariable(".datain." + dataInName); 212 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 213 if (unresolved != null && unresolved.booleanValue() == true) { 214 return "${coord:dataInPartitionFilter('" + dataInName + "', '" + type + "')}"; 215 } 216 return createPartitionFilter(uris, type); 217 } 218 219 /** 220 * Used to specify the HCat partition's value defining output for workflow job.<p/> Look for two evaluator-level 221 * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the current list of 222 * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 223 * unresolved, this function will echo back the original function <p/> otherwise it sends the partition value. 224 * 225 * @param dataOutName : Dataout name 226 * @param partitionName : Specific partition name whose value is wanted 227 */ 228 public static String ph3_coord_dataOutPartitionValue(String dataOutName, String partitionName) { 229 ELEvaluator eval = ELEvaluator.getCurrent(); 230 String uri = (String) eval.getVariable(".dataout." + dataOutName); 231 Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved"); 232 if (unresolved != null && unresolved.booleanValue() == true) { 233 return "${coord:dataOutPartitionValue('" + dataOutName + "', '" + partitionName + "')}"; 234 } 235 try { 236 HCatURI hcatUri = new HCatURI(uri); 237 return hcatUri.getPartitionValue(partitionName); 238 } 239 catch(URISyntaxException urie) { 240 XLog.getLog(HCatELFunctions.class).warn("Exception with uriTemplate [{0}]. Reason [{1}]: ", uri, urie); 241 throw new RuntimeException("HCat URI can't be parsed " + urie); 242 } 243 } 244 245 /** 246 * Used to specify the entire HCat partition defining output for workflow job.<p/> Look for two evaluator-level 247 * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the data-out 248 * HCat URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 249 * unresolved, this function will echo back the original function <p/> otherwise it sends the partition. 250 * 251 * @param dataOutName : DataOut name 252 */ 253 public static String ph3_coord_dataOutPartitions(String dataOutName) { 254 ELEvaluator eval = ELEvaluator.getCurrent(); 255 String uri = (String) eval.getVariable(".dataout." + dataOutName); 256 Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved"); 257 if (unresolved != null && unresolved.booleanValue() == true) { 258 return "${coord:dataOutPartitions('" + dataOutName + "')}"; 259 } 260 try { 261 return new HCatURI(uri).toPartitionString(); 262 } 263 catch (URISyntaxException e) { 264 throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e); 265 } 266 } 267 268 /** 269 * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level 270 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of 271 * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 272 * unresolved, this function will echo back the original function <p/> otherwise it sends the max partition value. 273 * 274 * @param dataInName : Datain name 275 * @param partitionName : Specific partition name whose MAX value is wanted 276 */ 277 public static String ph3_coord_dataInPartitionMin(String dataInName, String partitionName) { 278 ELEvaluator eval = ELEvaluator.getCurrent(); 279 String uris = (String) eval.getVariable(".datain." + dataInName); 280 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 281 if (unresolved != null && unresolved.booleanValue() == true) { 282 return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}"; 283 } 284 String minPartition = null; 285 if (uris != null) { 286 String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR); 287 // get the partition values list and find minimum 288 try { 289 // initialize minValue with first partition value 290 minPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName); 291 if (minPartition == null || minPartition.isEmpty()) { 292 throw new RuntimeException("No value in data-in uri for partition key: " + partitionName); 293 } 294 for (int i = 1; i < uriList.length; i++) { 295 String value = new HCatURI(uriList[i]).getPartitionValue(partitionName); 296 if(value.compareTo(minPartition) < 0) { //sticking to string comparison since some numerical date 297 //values can also contain letters e.g. 20120101T0300Z (UTC) 298 minPartition = value; 299 } 300 } 301 } 302 catch(URISyntaxException urie) { 303 throw new RuntimeException("HCat URI can't be parsed " + urie); 304 } 305 } 306 else { 307 XLog.getLog(HCatELFunctions.class).warn("URI is null"); 308 return null; 309 } 310 return minPartition; 311 } 312 313 /** 314 * Used to specify the MINIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level 315 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of 316 * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 317 * unresolved, this function will echo back the original function <p/> otherwise it sends the min partition value. 318 * 319 * @param dataInName : Datain name 320 * @param partitionName : Specific partition name whose MIN value is wanted 321 */ 322 public static String ph3_coord_dataInPartitionMax(String dataInName, String partitionName) { 323 ELEvaluator eval = ELEvaluator.getCurrent(); 324 String uris = (String) eval.getVariable(".datain." + dataInName); 325 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 326 if (unresolved != null && unresolved.booleanValue() == true) { 327 return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}"; 328 } 329 String maxPartition = null; 330 if (uris != null) { 331 String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR); 332 // get the partition values list and find minimum 333 try { 334 // initialize minValue with first partition value 335 maxPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName); 336 if (maxPartition == null || maxPartition.isEmpty()) { 337 throw new RuntimeException("No value in data-in uri for partition key: " + partitionName); 338 } 339 for(int i = 1; i < uriList.length; i++) { 340 String value = new HCatURI(uriList[i]).getPartitionValue(partitionName); 341 if(value.compareTo(maxPartition) > 0) { 342 maxPartition = value; 343 } 344 } 345 } 346 catch(URISyntaxException urie) { 347 throw new RuntimeException("HCat URI can't be parsed " + urie); 348 } 349 } 350 else { 351 XLog.getLog(HCatELFunctions.class).warn("URI is null"); 352 return null; 353 } 354 return maxPartition; 355 } 356 357 private static String createPartitionFilter(String uris, String type) { 358 String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR); 359 StringBuilder filter = new StringBuilder(""); 360 if (uriList.length > 0) { 361 for (String uri : uriList) { 362 if (filter.length() > 0) { 363 filter.append(" OR "); 364 } 365 try { 366 filter.append(new HCatURI(uri).toPartitionFilter(type)); 367 } 368 catch (URISyntaxException e) { 369 throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e); 370 } 371 } 372 } 373 return filter.toString(); 374 } 375 376 private static HCatURI getURIFromResolved(String dataInName, EventType type) { 377 final XLog LOG = XLog.getLog(HCatELFunctions.class); 378 StringBuilder uriTemplate = new StringBuilder(); 379 ELEvaluator eval = ELEvaluator.getCurrent(); 380 String uris; 381 if(type == EventType.input) { 382 uris = (String) eval.getVariable(".datain." + dataInName); 383 } 384 else { //type=output 385 uris = (String) eval.getVariable(".dataout." + dataInName); 386 } 387 if (uris != null) { 388 String[] uri = uris.split(CoordELFunctions.DIR_SEPARATOR, -1); 389 uriTemplate.append(uri[0]); 390 } 391 else { 392 LOG.warn("URI is NULL"); 393 return null; 394 } 395 LOG.info("uriTemplate [{0}] ", uriTemplate); 396 HCatURI hcatURI; 397 try { 398 hcatURI = new HCatURI(uriTemplate.toString()); 399 } 400 catch (URISyntaxException e) { 401 LOG.info("uriTemplate [{0}]. Reason [{1}]: ", uriTemplate, e); 402 throw new RuntimeException("HCat URI can't be parsed " + e); 403 } 404 return hcatURI; 405 } 406 407 private static boolean isValidDataEvent(String dataInName) { 408 ELEvaluator eval = ELEvaluator.getCurrent(); 409 String val = (String) eval.getVariable("oozie.dataname." + dataInName); 410 if (val == null || (val.equals("data-in") == false && val.equals("data-out") == false)) { 411 XLog.getLog(HCatELFunctions.class).error("dataset name " + dataInName + " is not valid. val :" + val); 412 throw new RuntimeException("data set name " + dataInName + " is not valid"); 413 } 414 return true; 415 } 416 417 private static String echoUnResolved(String functionName, String n) { 418 return echoUnResolvedPre(functionName, n, "coord:"); 419 } 420 421 private static String echoUnResolvedPre(String functionName, String n, String prefix) { 422 ELEvaluator eval = ELEvaluator.getCurrent(); 423 eval.setVariable(".wrap", "true"); 424 return prefix + functionName + "(" + n + ")"; // Unresolved 425 } 426 427 }