001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.coord;
020
021import java.net.URI;
022import java.net.URISyntaxException;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.oozie.DagELFunctions;
026import org.apache.oozie.client.WorkflowJob;
027import org.apache.oozie.dependency.URIHandler;
028import org.apache.oozie.service.Services;
029import org.apache.oozie.service.URIHandlerService;
030import org.apache.oozie.util.ELEvaluator;
031import org.apache.oozie.util.HCatURI;
032import org.apache.oozie.util.XLog;
033
034/**
035 * This class implements the EL function for HCat datasets in coordinator
036 */
037
038public class HCatELFunctions {
039    private static final Configuration EMPTY_CONF = new Configuration(true);
040
041    enum EventType {
042        input, output
043    }
044
045    /* Workflow Parameterization EL functions */
046
047    /**
048     * Return true if partitions exists or false if not.
049     *
050     * @param uri hcatalog partition uri.
051     * @return <code>true</code> if the uri exists, <code>false</code> if it does not.
052     * @throws Exception
053     */
054    public static boolean hcat_exists(String uri) throws Exception {
055        URI hcatURI = new URI(uri);
056        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
057        URIHandler handler = uriService.getURIHandler(hcatURI);
058        WorkflowJob workflow = DagELFunctions.getWorkflow();
059        String user = workflow.getUser();
060        return handler.exists(hcatURI, EMPTY_CONF, user);
061    }
062
063    /* Coord EL functions */
064
065    /**
066     * Echo the same EL function without evaluating anything
067     *
068     * @param dataInName
069     * @return the same EL function
070     */
071    public static String ph1_coord_databaseIn_echo(String dataInName) {
072        // Checking if the dataIn is correct?
073        isValidDataEvent(dataInName);
074        return echoUnResolved("databaseIn", "'" + dataInName + "'");
075    }
076
077    public static String ph1_coord_databaseOut_echo(String dataName) {
078        // Checking if the dataOut is correct?
079        isValidDataEvent(dataName);
080        return echoUnResolved("databaseOut", "'" + dataName + "'");
081    }
082
083    public static String ph1_coord_tableIn_echo(String dataName) {
084        // Checking if the dataIn is correct?
085        isValidDataEvent(dataName);
086        return echoUnResolved("tableIn", "'" + dataName + "'");
087    }
088
089    public static String ph1_coord_tableOut_echo(String dataName) {
090        // Checking if the dataOut is correct?
091        isValidDataEvent(dataName);
092        return echoUnResolved("tableOut", "'" + dataName + "'");
093    }
094
095    public static String ph1_coord_dataInPartitionFilter_echo(String dataInName, String type) {
096        // Checking if the dataIn/dataOut is correct?
097        isValidDataEvent(dataInName);
098        return echoUnResolved("dataInPartitionFilter", "'" + dataInName + "', '" + type + "'");
099    }
100
101    public static String ph1_coord_dataInPartitionMin_echo(String dataInName, String partition) {
102        // Checking if the dataIn/dataOut is correct?
103        isValidDataEvent(dataInName);
104        return echoUnResolved("dataInPartitionMin", "'" + dataInName + "', '" + partition + "'");
105    }
106
107    public static String ph1_coord_dataInPartitionMax_echo(String dataInName, String partition) {
108        // Checking if the dataIn/dataOut is correct?
109        isValidDataEvent(dataInName);
110        return echoUnResolved("dataInPartitionMax", "'" + dataInName + "', '" + partition + "'");
111    }
112
113    public static String ph1_coord_dataOutPartitions_echo(String dataOutName) {
114        // Checking if the dataIn/dataOut is correct?
115        isValidDataEvent(dataOutName);
116        return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'");
117    }
118
119    public static String ph1_coord_dataInPartitions_echo(String dataInName, String type) {
120        // Checking if the dataIn/dataOut is correct?
121        isValidDataEvent(dataInName);
122        return echoUnResolved("dataInPartitions", "'" + dataInName + "', '" + type + "'");
123    }
124
125    public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) {
126        // Checking if the dataIn/dataOut is correct?
127        isValidDataEvent(dataOutName);
128        return echoUnResolved("dataOutPartitionValue", "'" + dataOutName + "', '" + partition + "'");
129    }
130
131    /**
132     * Extract the hcat DB name from the URI-template associate with
133     * 'dataInName'. Caller needs to specify the EL-evaluator level variable
134     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
135     * (SyncCoordDataset)
136     *
137     * @param dataInName
138     * @return DB name
139     */
140    public static String ph3_coord_databaseIn(String dataInName) {
141        HCatURI hcatURI = getURIFromResolved(dataInName, EventType.input);
142        if (hcatURI != null) {
143            return hcatURI.getDb();
144        }
145        else {
146            return "";
147        }
148    }
149
150    /**
151     * Extract the hcat DB name from the URI-template associate with
152     * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
153     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
154     * (SyncCoordDataset)
155     *
156     * @param dataOutName
157     * @return DB name
158     */
159    public static String ph3_coord_databaseOut(String dataOutName) {
160        HCatURI hcatURI = getURIFromResolved(dataOutName, EventType.output);
161        if (hcatURI != null) {
162            return hcatURI.getDb();
163        }
164        else {
165            return "";
166        }
167    }
168
169    /**
170     * Extract the hcat Table name from the URI-template associate with
171     * 'dataInName'. Caller needs to specify the EL-evaluator level variable
172     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
173     * (SyncCoordDataset)
174     *
175     * @param dataInName
176     * @return Table name
177     */
178    public static String ph3_coord_tableIn(String dataInName) {
179        HCatURI hcatURI = getURIFromResolved(dataInName, EventType.input);
180        if (hcatURI != null) {
181            return hcatURI.getTable();
182        }
183        else {
184            return "";
185        }
186    }
187
188    /**
189     * Extract the hcat Table name from the URI-template associate with
190     * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
191     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
192     * (SyncCoordDataset)
193     *
194     * @param dataOutName
195     * @return Table name
196     */
197    public static String ph3_coord_tableOut(String dataOutName) {
198        HCatURI hcatURI = getURIFromResolved(dataOutName, EventType.output);
199        if (hcatURI != null) {
200            return hcatURI.getTable();
201        }
202        else {
203            return "";
204        }
205    }
206
207    /**
208     * Used to specify the HCat partition filter which is input dependency for workflow job.<p> Look for two evaluator-level
209     * variables <p> A) .datain.&lt;DATAIN_NAME&gt; B) .datain.&lt;DATAIN_NAME&gt;.unresolved <p> A defines the current list of
210     * HCat URIs. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something
211     * unresolved, this function will echo back the original function <p> otherwise it sends the partition filter.
212     *
213     * @param dataInName : Datain name
214     * @param type : for action type - pig, MR or hive
215     */
216    public static String ph3_coord_dataInPartitionFilter(String dataInName, String type) {
217        ELEvaluator eval = ELEvaluator.getCurrent();
218        String uris = (String) eval.getVariable(".datain." + dataInName);
219        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
220        if (unresolved != null && unresolved.booleanValue() == true) {
221            return "${coord:dataInPartitionFilter('" + dataInName + "', '" + type + "')}";
222        }
223        return createPartitionFilter(uris, type);
224    }
225
226    /**
227     * Used to specify the HCat partition's value defining output for workflow job.<p> Look for two evaluator-level
228     * variables <p> A) .dataout.&lt;DATAOUT_NAME&gt; B) .dataout.&lt;DATAOUT_NAME&gt;.unresolved <p> A defines the current list of
229     * HCat URIs. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something
230     * unresolved, this function will echo back the original function <p> otherwise it sends the partition value.
231     *
232     * @param dataOutName : Dataout name
233     * @param partitionName : Specific partition name whose value is wanted
234     */
235    public static String ph3_coord_dataOutPartitionValue(String dataOutName, String partitionName) {
236        ELEvaluator eval = ELEvaluator.getCurrent();
237        String uri = (String) eval.getVariable(".dataout." + dataOutName);
238        Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
239        if (unresolved != null && unresolved.booleanValue() == true) {
240            return "${coord:dataOutPartitionValue('" + dataOutName + "', '" + partitionName + "')}";
241        }
242        try {
243            HCatURI hcatUri = new HCatURI(uri);
244            return hcatUri.getPartitionValue(partitionName);
245        }
246        catch(URISyntaxException urie) {
247            XLog.getLog(HCatELFunctions.class).warn("Exception with uriTemplate [{0}]. Reason [{1}]: ", uri, urie);
248            throw new RuntimeException("HCat URI can't be parsed " + urie);
249        }
250    }
251
252    /**
253     * Used to specify the entire HCat partition defining output for workflow job.<p> Look for two evaluator-level
254     * variables <p> A) .dataout.&lt;DATAOUT_NAME&gt; B) .dataout.&lt;DATAOUT_NAME&gt;.unresolved <p> A defines the data-out
255     * HCat URI. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something
256     * unresolved, this function will echo back the original function <p> otherwise it sends the partition.
257     *
258     * @param dataOutName : DataOut name
259     */
260    public static String ph3_coord_dataOutPartitions(String dataOutName) {
261        ELEvaluator eval = ELEvaluator.getCurrent();
262        String uri = (String) eval.getVariable(".dataout." + dataOutName);
263        Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
264        if (unresolved != null && unresolved.booleanValue() == true) {
265            return "${coord:dataOutPartitions('" + dataOutName + "')}";
266        }
267        try {
268            return new HCatURI(uri).toPartitionString();
269        }
270        catch (URISyntaxException e) {
271            throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
272        }
273    }
274
275    /**
276     * Used to specify the entire HCat partition defining input for workflow job. <p> Look for two evaluator-level
277     * variables <p> A) .datain.&lt;DATAIN_NAME&gt; B) .datain.&lt;DATAIN_NAME&gt;.unresolved <p> A defines the data-in HCat URI.
278     * <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something unresolved,
279     * this function will echo back the original function <p> otherwise it sends the partition.
280     *
281     * @param dataInName : DataIn name
282     * @param type : for action type: hive-export
283     */
284    public static String ph3_coord_dataInPartitions(String dataInName, String type) {
285        ELEvaluator eval = ELEvaluator.getCurrent();
286        String uri = (String) eval.getVariable(".datain." + dataInName);
287        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
288        if (unresolved != null && unresolved.booleanValue() == true) {
289            return "${coord:dataInPartitions('" + dataInName + "', '" + type + "')}";
290        }
291        String partitionValue = null;
292        if (uri != null) {
293            if (type.equals("hive-export")) {
294                String[] uriList = uri.split(CoordELFunctions.DIR_SEPARATOR);
295                if (uriList.length > 1) {
296                    throw new RuntimeException("Multiple partitions not supported for hive-export type. Dataset name: "
297                        + dataInName + " URI: " + uri);
298                }
299                try {
300                    partitionValue = new HCatURI(uri).toPartitionValueString(type);
301                }
302                catch (URISyntaxException e) {
303                    throw new RuntimeException("Parsing exception for HCatURI " + uri, e);
304                }
305            } else {
306                  throw new RuntimeException("Unsupported type: " + type + " dataset name: " + dataInName);
307            }
308        }
309        else {
310            XLog.getLog(HCatELFunctions.class).warn("URI is null");
311            return null;
312        }
313        return partitionValue;
314    }
315
316    /**
317     * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p> Look for two evaluator-level
318     * variables <p> A) .datain.&lt;DATAIN_NAME&gt; B) .datain.&lt;DATAIN_NAME&gt;.unresolved <p> A defines the current list of
319     * HCat URIs. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something
320     * unresolved, this function will echo back the original function <p> otherwise it sends the max partition value.
321     *
322     * @param dataInName : Datain name
323     * @param partitionName : Specific partition name whose MAX value is wanted
324     */
325    public static String ph3_coord_dataInPartitionMin(String dataInName, String partitionName) {
326        ELEvaluator eval = ELEvaluator.getCurrent();
327        String uris = (String) eval.getVariable(".datain." + dataInName);
328        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
329        if (unresolved != null && unresolved.booleanValue() == true) {
330            return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
331        }
332        String minPartition = null;
333        if (uris != null) {
334            String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
335            // get the partition values list and find minimum
336            try {
337                // initialize minValue with first partition value
338                minPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
339                if (minPartition == null || minPartition.isEmpty()) {
340                    throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
341                }
342                for (int i = 1; i < uriList.length; i++) {
343                        String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
344                        if(value.compareTo(minPartition) < 0) { //sticking to string comparison since some numerical date
345                                                                //values can also contain letters e.g. 20120101T0300Z (UTC)
346                            minPartition = value;
347                        }
348                }
349            }
350            catch(URISyntaxException urie) {
351                throw new RuntimeException("HCat URI can't be parsed " + urie);
352            }
353        }
354        else {
355            XLog.getLog(HCatELFunctions.class).warn("URI is null");
356            return null;
357        }
358        return minPartition;
359    }
360
361    /**
362     * Used to specify the MINIMUM value of an HCat partition which is input dependency for workflow job.<p> Look for two evaluator-level
363     * variables <p> A) .datain.&lt;DATAIN_NAME&gt; B) .datain.&lt;DATAIN_NAME&gt;.unresolved <p> A defines the current list of
364     * HCat URIs. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something
365     * unresolved, this function will echo back the original function <p> otherwise it sends the min partition value.
366     *
367     * @param dataInName : Datain name
368     * @param partitionName : Specific partition name whose MIN value is wanted
369     */
370    public static String ph3_coord_dataInPartitionMax(String dataInName, String partitionName) {
371        ELEvaluator eval = ELEvaluator.getCurrent();
372        String uris = (String) eval.getVariable(".datain." + dataInName);
373        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
374        if (unresolved != null && unresolved.booleanValue() == true) {
375            return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
376        }
377        String maxPartition = null;
378        if (uris != null) {
379            String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
380            // get the partition values list and find minimum
381            try {
382                // initialize minValue with first partition value
383                maxPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
384                if (maxPartition == null || maxPartition.isEmpty()) {
385                    throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
386                }
387                for(int i = 1; i < uriList.length; i++) {
388                        String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
389                        if(value.compareTo(maxPartition) > 0) {
390                            maxPartition = value;
391                        }
392                }
393            }
394            catch(URISyntaxException urie) {
395                throw new RuntimeException("HCat URI can't be parsed " + urie);
396            }
397        }
398        else {
399            XLog.getLog(HCatELFunctions.class).warn("URI is null");
400            return null;
401        }
402        return maxPartition;
403    }
404
405    private static String createPartitionFilter(String uris, String type) {
406        String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
407        StringBuilder filter = new StringBuilder("");
408        if (uriList.length > 0) {
409            for (String uri : uriList) {
410                if (filter.length() > 0) {
411                    filter.append(" OR ");
412                }
413                try {
414                    filter.append(new HCatURI(uri).toPartitionFilter(type));
415                }
416                catch (URISyntaxException e) {
417                    throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
418                }
419            }
420        }
421        return filter.toString();
422    }
423
424    private static HCatURI getURIFromResolved(String dataInName, EventType type) {
425        final XLog LOG = XLog.getLog(HCatELFunctions.class);
426        StringBuilder uriTemplate = new StringBuilder();
427        ELEvaluator eval = ELEvaluator.getCurrent();
428        String uris;
429        if(type == EventType.input) {
430            uris = (String) eval.getVariable(".datain." + dataInName);
431        }
432        else { //type=output
433            uris = (String) eval.getVariable(".dataout." + dataInName);
434        }
435        if (uris != null) {
436            String[] uri = uris.split(CoordELFunctions.DIR_SEPARATOR, -1);
437            uriTemplate.append(uri[0]);
438        }
439        else {
440            LOG.warn("URI is NULL");
441            return null;
442        }
443        LOG.info("uriTemplate [{0}] ", uriTemplate);
444        HCatURI hcatURI;
445        try {
446            hcatURI = new HCatURI(uriTemplate.toString());
447        }
448        catch (URISyntaxException e) {
449            LOG.info("uriTemplate [{0}]. Reason [{1}]: ", uriTemplate, e);
450            throw new RuntimeException("HCat URI can't be parsed " + e);
451        }
452        return hcatURI;
453    }
454
455    private static boolean isValidDataEvent(String dataInName) {
456        ELEvaluator eval = ELEvaluator.getCurrent();
457        String val = (String) eval.getVariable("oozie.dataname." + dataInName);
458        if (val == null || (val.equals("data-in") == false && val.equals("data-out") == false)) {
459            XLog.getLog(HCatELFunctions.class).error("dataset name " + dataInName + " is not valid. val :" + val);
460            throw new RuntimeException("data set name " + dataInName + " is not valid");
461        }
462        return true;
463    }
464
465    private static String echoUnResolved(String functionName, String n) {
466        return echoUnResolvedPre(functionName, n, "coord:");
467    }
468
469    private static String echoUnResolvedPre(String functionName, String n, String prefix) {
470        ELEvaluator eval = ELEvaluator.getCurrent();
471        eval.setVariable(".wrap", "true");
472        return prefix + functionName + "(" + n + ")"; // Unresolved
473    }
474
475}