001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.coord;
019    
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.oozie.DagELFunctions;
025    import org.apache.oozie.client.WorkflowJob;
026    import org.apache.oozie.dependency.URIHandler;
027    import org.apache.oozie.service.Services;
028    import org.apache.oozie.service.URIHandlerService;
029    import org.apache.oozie.util.ELEvaluator;
030    import org.apache.oozie.util.HCatURI;
031    import org.apache.oozie.util.XLog;
032    
033    /**
034     * This class implements the EL function for HCat datasets in coordinator
035     */
036    
037    public class HCatELFunctions {
038        private static final Configuration EMPTY_CONF = new Configuration(true);
039    
040        enum EventType {
041            input, output
042        }
043    
044        /* Workflow Parameterization EL functions */
045    
046        /**
047         * Return true if partitions exists or false if not.
048         *
049         * @param uri hcatalog partition uri.
050         * @return <code>true</code> if the uri exists, <code>false</code> if it does not.
051         * @throws Exception
052         */
053        public static boolean hcat_exists(String uri) throws Exception {
054            URI hcatURI = new URI(uri);
055            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
056            URIHandler handler = uriService.getURIHandler(hcatURI);
057            WorkflowJob workflow = DagELFunctions.getWorkflow();
058            String user = workflow.getUser();
059            return handler.exists(hcatURI, EMPTY_CONF, user);
060        }
061    
062        /* Coord EL functions */
063    
064        /**
065         * Echo the same EL function without evaluating anything
066         *
067         * @param dataInName
068         * @return the same EL function
069         */
070        public static String ph1_coord_databaseIn_echo(String dataName) {
071            // Checking if the dataIn is correct?
072            isValidDataEvent(dataName);
073            return echoUnResolved("databaseIn", "'" + dataName + "'");
074        }
075    
076        public static String ph1_coord_databaseOut_echo(String dataName) {
077            // Checking if the dataOut is correct?
078            isValidDataEvent(dataName);
079            return echoUnResolved("databaseOut", "'" + dataName + "'");
080        }
081    
082        public static String ph1_coord_tableIn_echo(String dataName) {
083            // Checking if the dataIn is correct?
084            isValidDataEvent(dataName);
085            return echoUnResolved("tableIn", "'" + dataName + "'");
086        }
087    
088        public static String ph1_coord_tableOut_echo(String dataName) {
089            // Checking if the dataOut is correct?
090            isValidDataEvent(dataName);
091            return echoUnResolved("tableOut", "'" + dataName + "'");
092        }
093    
094        public static String ph1_coord_dataInPartitionFilter_echo(String dataInName, String type) {
095            // Checking if the dataIn/dataOut is correct?
096            isValidDataEvent(dataInName);
097            return echoUnResolved("dataInPartitionFilter", "'" + dataInName + "', '" + type + "'");
098        }
099    
100        public static String ph1_coord_dataInPartitionMin_echo(String dataInName, String partition) {
101            // Checking if the dataIn/dataOut is correct?
102            isValidDataEvent(dataInName);
103            return echoUnResolved("dataInPartitionMin", "'" + dataInName + "', '" + partition + "'");
104        }
105    
106        public static String ph1_coord_dataInPartitionMax_echo(String dataInName, String partition) {
107            // Checking if the dataIn/dataOut is correct?
108            isValidDataEvent(dataInName);
109            return echoUnResolved("dataInPartitionMax", "'" + dataInName + "', '" + partition + "'");
110        }
111    
112        public static String ph1_coord_dataOutPartitions_echo(String dataOutName) {
113            // Checking if the dataIn/dataOut is correct?
114            isValidDataEvent(dataOutName);
115            return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'");
116        }
117    
118        public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) {
119            // Checking if the dataIn/dataOut is correct?
120            isValidDataEvent(dataOutName);
121            return echoUnResolved("dataOutPartitionValue", "'" + dataOutName + "', '" + partition + "'");
122        }
123    
124        /**
125         * Extract the hcat DB name from the URI-template associate with
126         * 'dataInName'. Caller needs to specify the EL-evaluator level variable
127         * 'oozie.coord.el.dataset.bean' with synchronous dataset object
128         * (SyncCoordDataset)
129         *
130         * @param dataInName
131         * @return DB name
132         */
133        public static String ph3_coord_databaseIn(String dataName) {
134            HCatURI hcatURI = getURIFromResolved(dataName, EventType.input);
135            if (hcatURI != null) {
136                return hcatURI.getDb();
137            }
138            else {
139                return "";
140            }
141        }
142    
143        /**
144         * Extract the hcat DB name from the URI-template associate with
145         * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
146         * 'oozie.coord.el.dataset.bean' with synchronous dataset object
147         * (SyncCoordDataset)
148         *
149         * @param dataOutName
150         * @return DB name
151         */
152        public static String ph3_coord_databaseOut(String dataName) {
153            HCatURI hcatURI = getURIFromResolved(dataName, EventType.output);
154            if (hcatURI != null) {
155                return hcatURI.getDb();
156            }
157            else {
158                return "";
159            }
160        }
161    
162        /**
163         * Extract the hcat Table name from the URI-template associate with
164         * 'dataInName'. Caller needs to specify the EL-evaluator level variable
165         * 'oozie.coord.el.dataset.bean' with synchronous dataset object
166         * (SyncCoordDataset)
167         *
168         * @param dataInName
169         * @return Table name
170         */
171        public static String ph3_coord_tableIn(String dataName) {
172            HCatURI hcatURI = getURIFromResolved(dataName, EventType.input);
173            if (hcatURI != null) {
174                return hcatURI.getTable();
175            }
176            else {
177                return "";
178            }
179        }
180    
181        /**
182         * Extract the hcat Table name from the URI-template associate with
183         * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
184         * 'oozie.coord.el.dataset.bean' with synchronous dataset object
185         * (SyncCoordDataset)
186         *
187         * @param dataOutName
188         * @return Table name
189         */
190        public static String ph3_coord_tableOut(String dataName) {
191            HCatURI hcatURI = getURIFromResolved(dataName, EventType.output);
192            if (hcatURI != null) {
193                return hcatURI.getTable();
194            }
195            else {
196                return "";
197            }
198        }
199    
200        /**
201         * Used to specify the HCat partition filter which is input dependency for workflow job.<p/> Look for two evaluator-level
202         * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
203         * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
204         * unresolved, this function will echo back the original function <p/> otherwise it sends the partition filter.
205         *
206         * @param dataInName : Datain name
207         * @param type : for action type - pig, MR or hive
208         */
209        public static String ph3_coord_dataInPartitionFilter(String dataInName, String type) {
210            ELEvaluator eval = ELEvaluator.getCurrent();
211            String uris = (String) eval.getVariable(".datain." + dataInName);
212            Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
213            if (unresolved != null && unresolved.booleanValue() == true) {
214                return "${coord:dataInPartitionFilter('" + dataInName + "', '" + type + "')}";
215            }
216            return createPartitionFilter(uris, type);
217        }
218    
219        /**
220         * Used to specify the HCat partition's value defining output for workflow job.<p/> Look for two evaluator-level
221         * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the current list of
222         * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
223         * unresolved, this function will echo back the original function <p/> otherwise it sends the partition value.
224         *
225         * @param dataOutName : Dataout name
226         * @param partitionName : Specific partition name whose value is wanted
227         */
228        public static String ph3_coord_dataOutPartitionValue(String dataOutName, String partitionName) {
229            ELEvaluator eval = ELEvaluator.getCurrent();
230            String uri = (String) eval.getVariable(".dataout." + dataOutName);
231            Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
232            if (unresolved != null && unresolved.booleanValue() == true) {
233                return "${coord:dataOutPartitionValue('" + dataOutName + "', '" + partitionName + "')}";
234            }
235            try {
236                HCatURI hcatUri = new HCatURI(uri);
237                return hcatUri.getPartitionValue(partitionName);
238            }
239            catch(URISyntaxException urie) {
240                XLog.getLog(HCatELFunctions.class).warn("Exception with uriTemplate [{0}]. Reason [{1}]: ", uri, urie);
241                throw new RuntimeException("HCat URI can't be parsed " + urie);
242            }
243        }
244    
245        /**
246         * Used to specify the entire HCat partition defining output for workflow job.<p/> Look for two evaluator-level
247         * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the data-out
248         * HCat URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
249         * unresolved, this function will echo back the original function <p/> otherwise it sends the partition.
250         *
251         * @param dataOutName : DataOut name
252         */
253        public static String ph3_coord_dataOutPartitions(String dataOutName) {
254            ELEvaluator eval = ELEvaluator.getCurrent();
255            String uri = (String) eval.getVariable(".dataout." + dataOutName);
256            Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
257            if (unresolved != null && unresolved.booleanValue() == true) {
258                return "${coord:dataOutPartitions('" + dataOutName + "')}";
259            }
260            try {
261                return new HCatURI(uri).toPartitionString();
262            }
263            catch (URISyntaxException e) {
264                throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
265            }
266        }
267    
268        /**
269         * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
270         * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
271         * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
272         * unresolved, this function will echo back the original function <p/> otherwise it sends the max partition value.
273         *
274         * @param dataInName : Datain name
275         * @param partitionName : Specific partition name whose MAX value is wanted
276         */
277        public static String ph3_coord_dataInPartitionMin(String dataInName, String partitionName) {
278            ELEvaluator eval = ELEvaluator.getCurrent();
279            String uris = (String) eval.getVariable(".datain." + dataInName);
280            Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
281            if (unresolved != null && unresolved.booleanValue() == true) {
282                return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
283            }
284            String minPartition = null;
285            if (uris != null) {
286                String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
287                // get the partition values list and find minimum
288                try {
289                    // initialize minValue with first partition value
290                    minPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
291                    if (minPartition == null || minPartition.isEmpty()) {
292                        throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
293                    }
294                    for (int i = 1; i < uriList.length; i++) {
295                            String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
296                            if(value.compareTo(minPartition) < 0) { //sticking to string comparison since some numerical date
297                                                                    //values can also contain letters e.g. 20120101T0300Z (UTC)
298                                minPartition = value;
299                            }
300                    }
301                }
302                catch(URISyntaxException urie) {
303                    throw new RuntimeException("HCat URI can't be parsed " + urie);
304                }
305            }
306            else {
307                XLog.getLog(HCatELFunctions.class).warn("URI is null");
308                return null;
309            }
310            return minPartition;
311        }
312    
313        /**
314         * Used to specify the MINIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
315         * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
316         * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
317         * unresolved, this function will echo back the original function <p/> otherwise it sends the min partition value.
318         *
319         * @param dataInName : Datain name
320         * @param partitionName : Specific partition name whose MIN value is wanted
321         */
322        public static String ph3_coord_dataInPartitionMax(String dataInName, String partitionName) {
323            ELEvaluator eval = ELEvaluator.getCurrent();
324            String uris = (String) eval.getVariable(".datain." + dataInName);
325            Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
326            if (unresolved != null && unresolved.booleanValue() == true) {
327                return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
328            }
329            String maxPartition = null;
330            if (uris != null) {
331                String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
332                // get the partition values list and find minimum
333                try {
334                    // initialize minValue with first partition value
335                    maxPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
336                    if (maxPartition == null || maxPartition.isEmpty()) {
337                        throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
338                    }
339                    for(int i = 1; i < uriList.length; i++) {
340                            String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
341                            if(value.compareTo(maxPartition) > 0) {
342                                maxPartition = value;
343                            }
344                    }
345                }
346                catch(URISyntaxException urie) {
347                    throw new RuntimeException("HCat URI can't be parsed " + urie);
348                }
349            }
350            else {
351                XLog.getLog(HCatELFunctions.class).warn("URI is null");
352                return null;
353            }
354            return maxPartition;
355        }
356    
357        private static String createPartitionFilter(String uris, String type) {
358            String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
359            StringBuilder filter = new StringBuilder("");
360            if (uriList.length > 0) {
361                for (String uri : uriList) {
362                    if (filter.length() > 0) {
363                        filter.append(" OR ");
364                    }
365                    try {
366                        filter.append(new HCatURI(uri).toPartitionFilter(type));
367                    }
368                    catch (URISyntaxException e) {
369                        throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
370                    }
371                }
372            }
373            return filter.toString();
374        }
375    
376        private static HCatURI getURIFromResolved(String dataInName, EventType type) {
377            final XLog LOG = XLog.getLog(HCatELFunctions.class);
378            StringBuilder uriTemplate = new StringBuilder();
379            ELEvaluator eval = ELEvaluator.getCurrent();
380            String uris;
381            if(type == EventType.input) {
382                uris = (String) eval.getVariable(".datain." + dataInName);
383            }
384            else { //type=output
385                uris = (String) eval.getVariable(".dataout." + dataInName);
386            }
387            if (uris != null) {
388                String[] uri = uris.split(CoordELFunctions.DIR_SEPARATOR, -1);
389                uriTemplate.append(uri[0]);
390            }
391            else {
392                LOG.warn("URI is NULL");
393                return null;
394            }
395            LOG.info("uriTemplate [{0}] ", uriTemplate);
396            HCatURI hcatURI;
397            try {
398                hcatURI = new HCatURI(uriTemplate.toString());
399            }
400            catch (URISyntaxException e) {
401                LOG.info("uriTemplate [{0}]. Reason [{1}]: ", uriTemplate, e);
402                throw new RuntimeException("HCat URI can't be parsed " + e);
403            }
404            return hcatURI;
405        }
406    
407        private static boolean isValidDataEvent(String dataInName) {
408            ELEvaluator eval = ELEvaluator.getCurrent();
409            String val = (String) eval.getVariable("oozie.dataname." + dataInName);
410            if (val == null || (val.equals("data-in") == false && val.equals("data-out") == false)) {
411                XLog.getLog(HCatELFunctions.class).error("dataset name " + dataInName + " is not valid. val :" + val);
412                throw new RuntimeException("data set name " + dataInName + " is not valid");
413            }
414            return true;
415        }
416    
417        private static String echoUnResolved(String functionName, String n) {
418            return echoUnResolvedPre(functionName, n, "coord:");
419        }
420    
421        private static String echoUnResolvedPre(String functionName, String n, String prefix) {
422            ELEvaluator eval = ELEvaluator.getCurrent();
423            eval.setVariable(".wrap", "true");
424            return prefix + functionName + "(" + n + ")"; // Unresolved
425        }
426    
427    }