001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.coord;
019
020import java.net.URI;
021import java.net.URISyntaxException;
022
023import org.apache.hadoop.conf.Configuration;
024import org.apache.oozie.DagELFunctions;
025import org.apache.oozie.client.WorkflowJob;
026import org.apache.oozie.dependency.URIHandler;
027import org.apache.oozie.service.Services;
028import org.apache.oozie.service.URIHandlerService;
029import org.apache.oozie.util.ELEvaluator;
030import org.apache.oozie.util.HCatURI;
031import org.apache.oozie.util.XLog;
032
033/**
034 * This class implements the EL function for HCat datasets in coordinator
035 */
036
037public class HCatELFunctions {
038    private static final Configuration EMPTY_CONF = new Configuration(true);
039
040    enum EventType {
041        input, output
042    }
043
044    /* Workflow Parameterization EL functions */
045
046    /**
047     * Return true if partitions exists or false if not.
048     *
049     * @param uri hcatalog partition uri.
050     * @return <code>true</code> if the uri exists, <code>false</code> if it does not.
051     * @throws Exception
052     */
053    public static boolean hcat_exists(String uri) throws Exception {
054        URI hcatURI = new URI(uri);
055        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
056        URIHandler handler = uriService.getURIHandler(hcatURI);
057        WorkflowJob workflow = DagELFunctions.getWorkflow();
058        String user = workflow.getUser();
059        return handler.exists(hcatURI, EMPTY_CONF, user);
060    }
061
062    /* Coord EL functions */
063
064    /**
065     * Echo the same EL function without evaluating anything
066     *
067     * @param dataInName
068     * @return the same EL function
069     */
070    public static String ph1_coord_databaseIn_echo(String dataName) {
071        // Checking if the dataIn is correct?
072        isValidDataEvent(dataName);
073        return echoUnResolved("databaseIn", "'" + dataName + "'");
074    }
075
076    public static String ph1_coord_databaseOut_echo(String dataName) {
077        // Checking if the dataOut is correct?
078        isValidDataEvent(dataName);
079        return echoUnResolved("databaseOut", "'" + dataName + "'");
080    }
081
082    public static String ph1_coord_tableIn_echo(String dataName) {
083        // Checking if the dataIn is correct?
084        isValidDataEvent(dataName);
085        return echoUnResolved("tableIn", "'" + dataName + "'");
086    }
087
088    public static String ph1_coord_tableOut_echo(String dataName) {
089        // Checking if the dataOut is correct?
090        isValidDataEvent(dataName);
091        return echoUnResolved("tableOut", "'" + dataName + "'");
092    }
093
094    public static String ph1_coord_dataInPartitionFilter_echo(String dataInName, String type) {
095        // Checking if the dataIn/dataOut is correct?
096        isValidDataEvent(dataInName);
097        return echoUnResolved("dataInPartitionFilter", "'" + dataInName + "', '" + type + "'");
098    }
099
100    public static String ph1_coord_dataInPartitionMin_echo(String dataInName, String partition) {
101        // Checking if the dataIn/dataOut is correct?
102        isValidDataEvent(dataInName);
103        return echoUnResolved("dataInPartitionMin", "'" + dataInName + "', '" + partition + "'");
104    }
105
106    public static String ph1_coord_dataInPartitionMax_echo(String dataInName, String partition) {
107        // Checking if the dataIn/dataOut is correct?
108        isValidDataEvent(dataInName);
109        return echoUnResolved("dataInPartitionMax", "'" + dataInName + "', '" + partition + "'");
110    }
111
112    public static String ph1_coord_dataOutPartitions_echo(String dataOutName) {
113        // Checking if the dataIn/dataOut is correct?
114        isValidDataEvent(dataOutName);
115        return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'");
116    }
117
118    public static String ph1_coord_dataInPartitions_echo(String dataInName, String type) {
119        // Checking if the dataIn/dataOut is correct?
120        isValidDataEvent(dataInName);
121        return echoUnResolved("dataInPartitions", "'" + dataInName + "', '" + type + "'");
122    }
123
124    public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) {
125        // Checking if the dataIn/dataOut is correct?
126        isValidDataEvent(dataOutName);
127        return echoUnResolved("dataOutPartitionValue", "'" + dataOutName + "', '" + partition + "'");
128    }
129
130    /**
131     * Extract the hcat DB name from the URI-template associate with
132     * 'dataInName'. Caller needs to specify the EL-evaluator level variable
133     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
134     * (SyncCoordDataset)
135     *
136     * @param dataInName
137     * @return DB name
138     */
139    public static String ph3_coord_databaseIn(String dataName) {
140        HCatURI hcatURI = getURIFromResolved(dataName, EventType.input);
141        if (hcatURI != null) {
142            return hcatURI.getDb();
143        }
144        else {
145            return "";
146        }
147    }
148
149    /**
150     * Extract the hcat DB name from the URI-template associate with
151     * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
152     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
153     * (SyncCoordDataset)
154     *
155     * @param dataOutName
156     * @return DB name
157     */
158    public static String ph3_coord_databaseOut(String dataName) {
159        HCatURI hcatURI = getURIFromResolved(dataName, EventType.output);
160        if (hcatURI != null) {
161            return hcatURI.getDb();
162        }
163        else {
164            return "";
165        }
166    }
167
168    /**
169     * Extract the hcat Table name from the URI-template associate with
170     * 'dataInName'. Caller needs to specify the EL-evaluator level variable
171     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
172     * (SyncCoordDataset)
173     *
174     * @param dataInName
175     * @return Table name
176     */
177    public static String ph3_coord_tableIn(String dataName) {
178        HCatURI hcatURI = getURIFromResolved(dataName, EventType.input);
179        if (hcatURI != null) {
180            return hcatURI.getTable();
181        }
182        else {
183            return "";
184        }
185    }
186
187    /**
188     * Extract the hcat Table name from the URI-template associate with
189     * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
190     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
191     * (SyncCoordDataset)
192     *
193     * @param dataOutName
194     * @return Table name
195     */
196    public static String ph3_coord_tableOut(String dataName) {
197        HCatURI hcatURI = getURIFromResolved(dataName, EventType.output);
198        if (hcatURI != null) {
199            return hcatURI.getTable();
200        }
201        else {
202            return "";
203        }
204    }
205
206    /**
207     * Used to specify the HCat partition filter which is input dependency for workflow job.<p/> Look for two evaluator-level
208     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
209     * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
210     * unresolved, this function will echo back the original function <p/> otherwise it sends the partition filter.
211     *
212     * @param dataInName : Datain name
213     * @param type : for action type - pig, MR or hive
214     */
215    public static String ph3_coord_dataInPartitionFilter(String dataInName, String type) {
216        ELEvaluator eval = ELEvaluator.getCurrent();
217        String uris = (String) eval.getVariable(".datain." + dataInName);
218        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
219        if (unresolved != null && unresolved.booleanValue() == true) {
220            return "${coord:dataInPartitionFilter('" + dataInName + "', '" + type + "')}";
221        }
222        return createPartitionFilter(uris, type);
223    }
224
225    /**
226     * Used to specify the HCat partition's value defining output for workflow job.<p/> Look for two evaluator-level
227     * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the current list of
228     * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
229     * unresolved, this function will echo back the original function <p/> otherwise it sends the partition value.
230     *
231     * @param dataOutName : Dataout name
232     * @param partitionName : Specific partition name whose value is wanted
233     */
234    public static String ph3_coord_dataOutPartitionValue(String dataOutName, String partitionName) {
235        ELEvaluator eval = ELEvaluator.getCurrent();
236        String uri = (String) eval.getVariable(".dataout." + dataOutName);
237        Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
238        if (unresolved != null && unresolved.booleanValue() == true) {
239            return "${coord:dataOutPartitionValue('" + dataOutName + "', '" + partitionName + "')}";
240        }
241        try {
242            HCatURI hcatUri = new HCatURI(uri);
243            return hcatUri.getPartitionValue(partitionName);
244        }
245        catch(URISyntaxException urie) {
246            XLog.getLog(HCatELFunctions.class).warn("Exception with uriTemplate [{0}]. Reason [{1}]: ", uri, urie);
247            throw new RuntimeException("HCat URI can't be parsed " + urie);
248        }
249    }
250
251    /**
252     * Used to specify the entire HCat partition defining output for workflow job.<p/> Look for two evaluator-level
253     * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the data-out
254     * HCat URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
255     * unresolved, this function will echo back the original function <p/> otherwise it sends the partition.
256     *
257     * @param dataOutName : DataOut name
258     */
259    public static String ph3_coord_dataOutPartitions(String dataOutName) {
260        ELEvaluator eval = ELEvaluator.getCurrent();
261        String uri = (String) eval.getVariable(".dataout." + dataOutName);
262        Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
263        if (unresolved != null && unresolved.booleanValue() == true) {
264            return "${coord:dataOutPartitions('" + dataOutName + "')}";
265        }
266        try {
267            return new HCatURI(uri).toPartitionString();
268        }
269        catch (URISyntaxException e) {
270            throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
271        }
272    }
273
274    /**
275     * Used to specify the entire HCat partition defining input for workflow job. <p/> Look for two evaluator-level
276     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the data-in HCat URI.
277     * <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something unresolved,
278     * this function will echo back the original function <p/> otherwise it sends the partition.
279     *
280     * @param dataInName : DataIn name
281     * @param type : for action type: hive-export
282     */
283    public static String ph3_coord_dataInPartitions(String dataInName, String type) {
284        ELEvaluator eval = ELEvaluator.getCurrent();
285        String uri = (String) eval.getVariable(".datain." + dataInName);
286        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
287        if (unresolved != null && unresolved.booleanValue() == true) {
288            return "${coord:dataInPartitions('" + dataInName + "', '" + type + "')}";
289        }
290        String partitionValue = null;
291        if (uri != null) {
292            if (type.equals("hive-export")) {
293                String[] uriList = uri.split(CoordELFunctions.DIR_SEPARATOR);
294                if (uriList.length > 1) {
295                    throw new RuntimeException("Multiple partitions not supported for hive-export type. Dataset name: "
296                        + dataInName + " URI: " + uri);
297                }
298                try {
299                    partitionValue = new HCatURI(uri).toPartitionValueString(type);
300                }
301                catch (URISyntaxException e) {
302                    throw new RuntimeException("Parsing exception for HCatURI " + uri, e);
303                }
304            } else {
305                  throw new RuntimeException("Unsupported type: " + type + " dataset name: " + dataInName);
306            }
307        }
308        else {
309            XLog.getLog(HCatELFunctions.class).warn("URI is null");
310            return null;
311        }
312        return partitionValue;
313    }
314
315    /**
316     * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
317     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
318     * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
319     * unresolved, this function will echo back the original function <p/> otherwise it sends the max partition value.
320     *
321     * @param dataInName : Datain name
322     * @param partitionName : Specific partition name whose MAX value is wanted
323     */
324    public static String ph3_coord_dataInPartitionMin(String dataInName, String partitionName) {
325        ELEvaluator eval = ELEvaluator.getCurrent();
326        String uris = (String) eval.getVariable(".datain." + dataInName);
327        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
328        if (unresolved != null && unresolved.booleanValue() == true) {
329            return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
330        }
331        String minPartition = null;
332        if (uris != null) {
333            String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
334            // get the partition values list and find minimum
335            try {
336                // initialize minValue with first partition value
337                minPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
338                if (minPartition == null || minPartition.isEmpty()) {
339                    throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
340                }
341                for (int i = 1; i < uriList.length; i++) {
342                        String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
343                        if(value.compareTo(minPartition) < 0) { //sticking to string comparison since some numerical date
344                                                                //values can also contain letters e.g. 20120101T0300Z (UTC)
345                            minPartition = value;
346                        }
347                }
348            }
349            catch(URISyntaxException urie) {
350                throw new RuntimeException("HCat URI can't be parsed " + urie);
351            }
352        }
353        else {
354            XLog.getLog(HCatELFunctions.class).warn("URI is null");
355            return null;
356        }
357        return minPartition;
358    }
359
360    /**
361     * Used to specify the MINIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
362     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
363     * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
364     * unresolved, this function will echo back the original function <p/> otherwise it sends the min partition value.
365     *
366     * @param dataInName : Datain name
367     * @param partitionName : Specific partition name whose MIN value is wanted
368     */
369    public static String ph3_coord_dataInPartitionMax(String dataInName, String partitionName) {
370        ELEvaluator eval = ELEvaluator.getCurrent();
371        String uris = (String) eval.getVariable(".datain." + dataInName);
372        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
373        if (unresolved != null && unresolved.booleanValue() == true) {
374            return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
375        }
376        String maxPartition = null;
377        if (uris != null) {
378            String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
379            // get the partition values list and find minimum
380            try {
381                // initialize minValue with first partition value
382                maxPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
383                if (maxPartition == null || maxPartition.isEmpty()) {
384                    throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
385                }
386                for(int i = 1; i < uriList.length; i++) {
387                        String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
388                        if(value.compareTo(maxPartition) > 0) {
389                            maxPartition = value;
390                        }
391                }
392            }
393            catch(URISyntaxException urie) {
394                throw new RuntimeException("HCat URI can't be parsed " + urie);
395            }
396        }
397        else {
398            XLog.getLog(HCatELFunctions.class).warn("URI is null");
399            return null;
400        }
401        return maxPartition;
402    }
403
404    private static String createPartitionFilter(String uris, String type) {
405        String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
406        StringBuilder filter = new StringBuilder("");
407        if (uriList.length > 0) {
408            for (String uri : uriList) {
409                if (filter.length() > 0) {
410                    filter.append(" OR ");
411                }
412                try {
413                    filter.append(new HCatURI(uri).toPartitionFilter(type));
414                }
415                catch (URISyntaxException e) {
416                    throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
417                }
418            }
419        }
420        return filter.toString();
421    }
422
423    private static HCatURI getURIFromResolved(String dataInName, EventType type) {
424        final XLog LOG = XLog.getLog(HCatELFunctions.class);
425        StringBuilder uriTemplate = new StringBuilder();
426        ELEvaluator eval = ELEvaluator.getCurrent();
427        String uris;
428        if(type == EventType.input) {
429            uris = (String) eval.getVariable(".datain." + dataInName);
430        }
431        else { //type=output
432            uris = (String) eval.getVariable(".dataout." + dataInName);
433        }
434        if (uris != null) {
435            String[] uri = uris.split(CoordELFunctions.DIR_SEPARATOR, -1);
436            uriTemplate.append(uri[0]);
437        }
438        else {
439            LOG.warn("URI is NULL");
440            return null;
441        }
442        LOG.info("uriTemplate [{0}] ", uriTemplate);
443        HCatURI hcatURI;
444        try {
445            hcatURI = new HCatURI(uriTemplate.toString());
446        }
447        catch (URISyntaxException e) {
448            LOG.info("uriTemplate [{0}]. Reason [{1}]: ", uriTemplate, e);
449            throw new RuntimeException("HCat URI can't be parsed " + e);
450        }
451        return hcatURI;
452    }
453
454    private static boolean isValidDataEvent(String dataInName) {
455        ELEvaluator eval = ELEvaluator.getCurrent();
456        String val = (String) eval.getVariable("oozie.dataname." + dataInName);
457        if (val == null || (val.equals("data-in") == false && val.equals("data-out") == false)) {
458            XLog.getLog(HCatELFunctions.class).error("dataset name " + dataInName + " is not valid. val :" + val);
459            throw new RuntimeException("data set name " + dataInName + " is not valid");
460        }
461        return true;
462    }
463
464    private static String echoUnResolved(String functionName, String n) {
465        return echoUnResolvedPre(functionName, n, "coord:");
466    }
467
468    private static String echoUnResolvedPre(String functionName, String n, String prefix) {
469        ELEvaluator eval = ELEvaluator.getCurrent();
470        eval.setVariable(".wrap", "true");
471        return prefix + functionName + "(" + n + ")"; // Unresolved
472    }
473
474}