This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.coord;
019
020 import java.net.URI;
021 import java.net.URISyntaxException;
022
023 import org.apache.hadoop.conf.Configuration;
024 import org.apache.oozie.DagELFunctions;
025 import org.apache.oozie.client.WorkflowJob;
026 import org.apache.oozie.dependency.URIHandler;
027 import org.apache.oozie.service.Services;
028 import org.apache.oozie.service.URIHandlerService;
029 import org.apache.oozie.util.ELEvaluator;
030 import org.apache.oozie.util.HCatURI;
031 import org.apache.oozie.util.XLog;
032
033 /**
034 * This class implements the EL function for HCat datasets in coordinator
035 */
036
037 public class HCatELFunctions {
038 private static final Configuration EMPTY_CONF = new Configuration(true);
039
040 enum EventType {
041 input, output
042 }
043
044 /* Workflow Parameterization EL functions */
045
046 /**
047 * Return true if partitions exists or false if not.
048 *
049 * @param uri hcatalog partition uri.
050 * @return <code>true</code> if the uri exists, <code>false</code> if it does not.
051 * @throws Exception
052 */
053 public static boolean hcat_exists(String uri) throws Exception {
054 URI hcatURI = new URI(uri);
055 URIHandlerService uriService = Services.get().get(URIHandlerService.class);
056 URIHandler handler = uriService.getURIHandler(hcatURI);
057 WorkflowJob workflow = DagELFunctions.getWorkflow();
058 String user = workflow.getUser();
059 return handler.exists(hcatURI, EMPTY_CONF, user);
060 }
061
062 /* Coord EL functions */
063
064 /**
065 * Echo the same EL function without evaluating anything
066 *
067 * @param dataInName
068 * @return the same EL function
069 */
070 public static String ph1_coord_databaseIn_echo(String dataName) {
071 // Checking if the dataIn is correct?
072 isValidDataEvent(dataName);
073 return echoUnResolved("databaseIn", "'" + dataName + "'");
074 }
075
076 public static String ph1_coord_databaseOut_echo(String dataName) {
077 // Checking if the dataOut is correct?
078 isValidDataEvent(dataName);
079 return echoUnResolved("databaseOut", "'" + dataName + "'");
080 }
081
082 public static String ph1_coord_tableIn_echo(String dataName) {
083 // Checking if the dataIn is correct?
084 isValidDataEvent(dataName);
085 return echoUnResolved("tableIn", "'" + dataName + "'");
086 }
087
088 public static String ph1_coord_tableOut_echo(String dataName) {
089 // Checking if the dataOut is correct?
090 isValidDataEvent(dataName);
091 return echoUnResolved("tableOut", "'" + dataName + "'");
092 }
093
094 public static String ph1_coord_dataInPartitionFilter_echo(String dataInName, String type) {
095 // Checking if the dataIn/dataOut is correct?
096 isValidDataEvent(dataInName);
097 return echoUnResolved("dataInPartitionFilter", "'" + dataInName + "', '" + type + "'");
098 }
099
100 public static String ph1_coord_dataInPartitionMin_echo(String dataInName, String partition) {
101 // Checking if the dataIn/dataOut is correct?
102 isValidDataEvent(dataInName);
103 return echoUnResolved("dataInPartitionMin", "'" + dataInName + "', '" + partition + "'");
104 }
105
106 public static String ph1_coord_dataInPartitionMax_echo(String dataInName, String partition) {
107 // Checking if the dataIn/dataOut is correct?
108 isValidDataEvent(dataInName);
109 return echoUnResolved("dataInPartitionMax", "'" + dataInName + "', '" + partition + "'");
110 }
111
112 public static String ph1_coord_dataOutPartitions_echo(String dataOutName) {
113 // Checking if the dataIn/dataOut is correct?
114 isValidDataEvent(dataOutName);
115 return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'");
116 }
117
118 public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) {
119 // Checking if the dataIn/dataOut is correct?
120 isValidDataEvent(dataOutName);
121 return echoUnResolved("dataOutPartitionValue", "'" + dataOutName + "', '" + partition + "'");
122 }
123
124 /**
125 * Extract the hcat DB name from the URI-template associate with
126 * 'dataInName'. Caller needs to specify the EL-evaluator level variable
127 * 'oozie.coord.el.dataset.bean' with synchronous dataset object
128 * (SyncCoordDataset)
129 *
130 * @param dataInName
131 * @return DB name
132 */
133 public static String ph3_coord_databaseIn(String dataName) {
134 HCatURI hcatURI = getURIFromResolved(dataName, EventType.input);
135 if (hcatURI != null) {
136 return hcatURI.getDb();
137 }
138 else {
139 return "";
140 }
141 }
142
143 /**
144 * Extract the hcat DB name from the URI-template associate with
145 * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
146 * 'oozie.coord.el.dataset.bean' with synchronous dataset object
147 * (SyncCoordDataset)
148 *
149 * @param dataOutName
150 * @return DB name
151 */
152 public static String ph3_coord_databaseOut(String dataName) {
153 HCatURI hcatURI = getURIFromResolved(dataName, EventType.output);
154 if (hcatURI != null) {
155 return hcatURI.getDb();
156 }
157 else {
158 return "";
159 }
160 }
161
162 /**
163 * Extract the hcat Table name from the URI-template associate with
164 * 'dataInName'. Caller needs to specify the EL-evaluator level variable
165 * 'oozie.coord.el.dataset.bean' with synchronous dataset object
166 * (SyncCoordDataset)
167 *
168 * @param dataInName
169 * @return Table name
170 */
171 public static String ph3_coord_tableIn(String dataName) {
172 HCatURI hcatURI = getURIFromResolved(dataName, EventType.input);
173 if (hcatURI != null) {
174 return hcatURI.getTable();
175 }
176 else {
177 return "";
178 }
179 }
180
181 /**
182 * Extract the hcat Table name from the URI-template associate with
183 * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
184 * 'oozie.coord.el.dataset.bean' with synchronous dataset object
185 * (SyncCoordDataset)
186 *
187 * @param dataOutName
188 * @return Table name
189 */
190 public static String ph3_coord_tableOut(String dataName) {
191 HCatURI hcatURI = getURIFromResolved(dataName, EventType.output);
192 if (hcatURI != null) {
193 return hcatURI.getTable();
194 }
195 else {
196 return "";
197 }
198 }
199
200 /**
201 * Used to specify the HCat partition filter which is input dependency for workflow job.<p/> Look for two evaluator-level
202 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
203 * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
204 * unresolved, this function will echo back the original function <p/> otherwise it sends the partition filter.
205 *
206 * @param dataInName : Datain name
207 * @param type : for action type - pig, MR or hive
208 */
209 public static String ph3_coord_dataInPartitionFilter(String dataInName, String type) {
210 ELEvaluator eval = ELEvaluator.getCurrent();
211 String uris = (String) eval.getVariable(".datain." + dataInName);
212 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
213 if (unresolved != null && unresolved.booleanValue() == true) {
214 return "${coord:dataInPartitionFilter('" + dataInName + "', '" + type + "')}";
215 }
216 return createPartitionFilter(uris, type);
217 }
218
219 /**
220 * Used to specify the HCat partition's value defining output for workflow job.<p/> Look for two evaluator-level
221 * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the current list of
222 * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
223 * unresolved, this function will echo back the original function <p/> otherwise it sends the partition value.
224 *
225 * @param dataOutName : Dataout name
226 * @param partitionName : Specific partition name whose value is wanted
227 */
228 public static String ph3_coord_dataOutPartitionValue(String dataOutName, String partitionName) {
229 ELEvaluator eval = ELEvaluator.getCurrent();
230 String uri = (String) eval.getVariable(".dataout." + dataOutName);
231 Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
232 if (unresolved != null && unresolved.booleanValue() == true) {
233 return "${coord:dataOutPartitionValue('" + dataOutName + "', '" + partitionName + "')}";
234 }
235 try {
236 HCatURI hcatUri = new HCatURI(uri);
237 return hcatUri.getPartitionValue(partitionName);
238 }
239 catch(URISyntaxException urie) {
240 XLog.getLog(HCatELFunctions.class).warn("Exception with uriTemplate [{0}]. Reason [{1}]: ", uri, urie);
241 throw new RuntimeException("HCat URI can't be parsed " + urie);
242 }
243 }
244
245 /**
246 * Used to specify the entire HCat partition defining output for workflow job.<p/> Look for two evaluator-level
247 * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the data-out
248 * HCat URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
249 * unresolved, this function will echo back the original function <p/> otherwise it sends the partition.
250 *
251 * @param dataOutName : DataOut name
252 */
253 public static String ph3_coord_dataOutPartitions(String dataOutName) {
254 ELEvaluator eval = ELEvaluator.getCurrent();
255 String uri = (String) eval.getVariable(".dataout." + dataOutName);
256 Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
257 if (unresolved != null && unresolved.booleanValue() == true) {
258 return "${coord:dataOutPartitions('" + dataOutName + "')}";
259 }
260 try {
261 return new HCatURI(uri).toPartitionString();
262 }
263 catch (URISyntaxException e) {
264 throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
265 }
266 }
267
268 /**
269 * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
270 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
271 * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
272 * unresolved, this function will echo back the original function <p/> otherwise it sends the max partition value.
273 *
274 * @param dataInName : Datain name
275 * @param partitionName : Specific partition name whose MAX value is wanted
276 */
277 public static String ph3_coord_dataInPartitionMin(String dataInName, String partitionName) {
278 ELEvaluator eval = ELEvaluator.getCurrent();
279 String uris = (String) eval.getVariable(".datain." + dataInName);
280 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
281 if (unresolved != null && unresolved.booleanValue() == true) {
282 return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
283 }
284 String minPartition = null;
285 if (uris != null) {
286 String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
287 // get the partition values list and find minimum
288 try {
289 // initialize minValue with first partition value
290 minPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
291 if (minPartition == null || minPartition.isEmpty()) {
292 throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
293 }
294 for (int i = 1; i < uriList.length; i++) {
295 String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
296 if(value.compareTo(minPartition) < 0) { //sticking to string comparison since some numerical date
297 //values can also contain letters e.g. 20120101T0300Z (UTC)
298 minPartition = value;
299 }
300 }
301 }
302 catch(URISyntaxException urie) {
303 throw new RuntimeException("HCat URI can't be parsed " + urie);
304 }
305 }
306 else {
307 XLog.getLog(HCatELFunctions.class).warn("URI is null");
308 return null;
309 }
310 return minPartition;
311 }
312
313 /**
314 * Used to specify the MINIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
315 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
316 * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
317 * unresolved, this function will echo back the original function <p/> otherwise it sends the min partition value.
318 *
319 * @param dataInName : Datain name
320 * @param partitionName : Specific partition name whose MIN value is wanted
321 */
322 public static String ph3_coord_dataInPartitionMax(String dataInName, String partitionName) {
323 ELEvaluator eval = ELEvaluator.getCurrent();
324 String uris = (String) eval.getVariable(".datain." + dataInName);
325 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
326 if (unresolved != null && unresolved.booleanValue() == true) {
327 return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
328 }
329 String maxPartition = null;
330 if (uris != null) {
331 String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
332 // get the partition values list and find minimum
333 try {
334 // initialize minValue with first partition value
335 maxPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
336 if (maxPartition == null || maxPartition.isEmpty()) {
337 throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
338 }
339 for(int i = 1; i < uriList.length; i++) {
340 String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
341 if(value.compareTo(maxPartition) > 0) {
342 maxPartition = value;
343 }
344 }
345 }
346 catch(URISyntaxException urie) {
347 throw new RuntimeException("HCat URI can't be parsed " + urie);
348 }
349 }
350 else {
351 XLog.getLog(HCatELFunctions.class).warn("URI is null");
352 return null;
353 }
354 return maxPartition;
355 }
356
357 private static String createPartitionFilter(String uris, String type) {
358 String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
359 StringBuilder filter = new StringBuilder("");
360 if (uriList.length > 0) {
361 for (String uri : uriList) {
362 if (filter.length() > 0) {
363 filter.append(" OR ");
364 }
365 try {
366 filter.append(new HCatURI(uri).toPartitionFilter(type));
367 }
368 catch (URISyntaxException e) {
369 throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
370 }
371 }
372 }
373 return filter.toString();
374 }
375
376 private static HCatURI getURIFromResolved(String dataInName, EventType type) {
377 final XLog LOG = XLog.getLog(HCatELFunctions.class);
378 StringBuilder uriTemplate = new StringBuilder();
379 ELEvaluator eval = ELEvaluator.getCurrent();
380 String uris;
381 if(type == EventType.input) {
382 uris = (String) eval.getVariable(".datain." + dataInName);
383 }
384 else { //type=output
385 uris = (String) eval.getVariable(".dataout." + dataInName);
386 }
387 if (uris != null) {
388 String[] uri = uris.split(CoordELFunctions.DIR_SEPARATOR, -1);
389 uriTemplate.append(uri[0]);
390 }
391 else {
392 LOG.warn("URI is NULL");
393 return null;
394 }
395 LOG.info("uriTemplate [{0}] ", uriTemplate);
396 HCatURI hcatURI;
397 try {
398 hcatURI = new HCatURI(uriTemplate.toString());
399 }
400 catch (URISyntaxException e) {
401 LOG.info("uriTemplate [{0}]. Reason [{1}]: ", uriTemplate, e);
402 throw new RuntimeException("HCat URI can't be parsed " + e);
403 }
404 return hcatURI;
405 }
406
407 private static boolean isValidDataEvent(String dataInName) {
408 ELEvaluator eval = ELEvaluator.getCurrent();
409 String val = (String) eval.getVariable("oozie.dataname." + dataInName);
410 if (val == null || (val.equals("data-in") == false && val.equals("data-out") == false)) {
411 XLog.getLog(HCatELFunctions.class).error("dataset name " + dataInName + " is not valid. val :" + val);
412 throw new RuntimeException("data set name " + dataInName + " is not valid");
413 }
414 return true;
415 }
416
417 private static String echoUnResolved(String functionName, String n) {
418 return echoUnResolvedPre(functionName, n, "coord:");
419 }
420
421 private static String echoUnResolvedPre(String functionName, String n, String prefix) {
422 ELEvaluator eval = ELEvaluator.getCurrent();
423 eval.setVariable(".wrap", "true");
424 return prefix + functionName + "(" + n + ")"; // Unresolved
425 }
426
427 }