001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.IOException;
021    import java.io.Writer;
022    import java.text.ParseException;
023    import java.util.ArrayList;
024    import java.util.Date;
025    import java.util.HashMap;
026    import java.util.HashSet;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.StringTokenizer;
031    
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.oozie.client.CoordinatorAction;
034    import org.apache.oozie.client.CoordinatorJob;
035    import org.apache.oozie.client.Job;
036    import org.apache.oozie.client.OozieClient;
037    import org.apache.oozie.client.WorkflowJob;
038    import org.apache.oozie.client.rest.BulkResponseImpl;
039    import org.apache.oozie.command.BulkJobsXCommand;
040    import org.apache.oozie.command.CommandException;
041    import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
042    import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
043    import org.apache.oozie.command.bundle.BundleJobSuspendXCommand;
044    import org.apache.oozie.command.bundle.BundleJobXCommand;
045    import org.apache.oozie.command.bundle.BundleJobsXCommand;
046    import org.apache.oozie.command.bundle.BundleKillXCommand;
047    import org.apache.oozie.command.bundle.BundleRerunXCommand;
048    import org.apache.oozie.command.bundle.BundleStartXCommand;
049    import org.apache.oozie.command.bundle.BundleSubmitXCommand;
050    import org.apache.oozie.service.DagXLogInfoService;
051    import org.apache.oozie.service.Services;
052    import org.apache.oozie.service.XLogService;
053    import org.apache.oozie.util.DateUtils;
054    import org.apache.oozie.util.ParamChecker;
055    import org.apache.oozie.util.XLog;
056    import org.apache.oozie.util.XLogStreamer;
057    
058    public class BundleEngine extends BaseEngine {
059        /**
060         * Create a system Bundle engine, with no user and no group.
061         */
062        public BundleEngine() {
063        }
064    
065        /**
066         * Create a Bundle engine to perform operations on behave of a user.
067         *
068         * @param user user name.
069         * @param authToken the authentication token.
070         */
071        public BundleEngine(String user, String authToken) {
072            this.user = ParamChecker.notEmpty(user, "user");
073            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
074        }
075    
076        /* (non-Javadoc)
077         * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
078         */
079        @Override
080        public void change(String jobId, String changeValue) throws BundleEngineException {
081            try {
082                BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue);
083                change.call();
084            }
085            catch (CommandException ex) {
086                throw new BundleEngineException(ex);
087            }
088        }
089    
090        /* (non-Javadoc)
091         * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
092         */
093        @Override
094        public String dryRunSubmit(Configuration conf) throws BundleEngineException {
095            BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf, getAuthToken());
096            try {
097                String jobId = submit.call();
098                return jobId;
099            }
100            catch (CommandException ex) {
101                throw new BundleEngineException(ex);
102            }
103        }
104    
105        /* (non-Javadoc)
106         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
107         */
108        @Override
109        public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException {
110            throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine"));
111        }
112    
113        public BundleJobBean getBundleJob(String jobId) throws BundleEngineException {
114            try {
115                return new BundleJobXCommand(jobId).call();
116            }
117            catch (CommandException ex) {
118                throw new BundleEngineException(ex);
119            }
120        }
121    
122        /* (non-Javadoc)
123         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
124         */
125        @Override
126        public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length) throws BundleEngineException {
127            throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine"));
128        }
129    
130        /* (non-Javadoc)
131         * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
132         */
133        @Override
134        public String getDefinition(String jobId) throws BundleEngineException {
135            BundleJobBean job;
136            try {
137                job = new BundleJobXCommand(jobId).call();
138            }
139            catch (CommandException ex) {
140                throw new BundleEngineException(ex);
141            }
142            return job.getOrigJobXml();
143        }
144    
145        /* (non-Javadoc)
146         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
147         */
148        @Override
149        public WorkflowJob getJob(String jobId) throws BundleEngineException {
150            throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
151        }
152    
153        /* (non-Javadoc)
154         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
155         */
156        @Override
157        public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException {
158            throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
159        }
160    
161        /* (non-Javadoc)
162         * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
163         */
164        @Override
165        public String getJobIdForExternalId(String externalId) throws BundleEngineException {
166            return null;
167        }
168    
169        /* (non-Javadoc)
170         * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
171         */
172        @Override
173        public void kill(String jobId) throws BundleEngineException {
174            try {
175                new BundleKillXCommand(jobId).call();
176            }
177            catch (CommandException e) {
178                throw new BundleEngineException(e);
179            }
180        }
181    
182        /* (non-Javadoc)
183         * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration)
184         */
185        @Override
186        @Deprecated
187        public void reRun(String jobId, Configuration conf) throws BundleEngineException {
188            throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun"));
189        }
190    
191        /**
192         * Rerun Bundle actions for given rerunType
193         *
194         * @param jobId bundle job id
195         * @param coordScope the rerun scope for coordinator job names separated by ","
196         * @param dateScope the rerun scope for coordinator nominal times separated by ","
197         * @param refresh true if user wants to refresh input/outpur dataset urls
198         * @param noCleanup false if user wants to cleanup output events for given rerun actions
199         * @throws BaseEngineException thrown if failed to rerun
200         */
201        public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
202                throws BaseEngineException {
203            try {
204                new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call();
205            }
206            catch (CommandException ex) {
207                throw new BaseEngineException(ex);
208            }
209        }
210    
211        /* (non-Javadoc)
212         * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
213         */
214        @Override
215        public void resume(String jobId) throws BundleEngineException {
216            BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId);
217            try {
218                resume.call();
219            }
220            catch (CommandException ex) {
221                throw new BundleEngineException(ex);
222            }
223        }
224    
225        /* (non-Javadoc)
226         * @see org.apache.oozie.BaseEngine#start(java.lang.String)
227         */
228        @Override
229        public void start(String jobId) throws BundleEngineException {
230            try {
231                new BundleStartXCommand(jobId).call();
232            }
233            catch (CommandException e) {
234                throw new BundleEngineException(e);
235            }
236        }
237    
238        /* (non-Javadoc)
239         * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer)
240         */
241        @Override
242        public void streamLog(String jobId, Writer writer) throws IOException, BundleEngineException {
243            XLogStreamer.Filter filter = new XLogStreamer.Filter();
244            filter.setParameter(DagXLogInfoService.JOB, jobId);
245    
246            BundleJobBean job;
247            try {
248                job = new BundleJobXCommand(jobId).call();
249            }
250            catch (CommandException ex) {
251                throw new BundleEngineException(ex);
252            }
253    
254            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
255        }
256    
257        /* (non-Javadoc)
258         * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)
259         */
260        @Override
261        public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException {
262            try {
263                String jobId = new BundleSubmitXCommand(conf, getAuthToken()).call();
264    
265                if (startJob) {
266                    start(jobId);
267                }
268                return jobId;
269            }
270            catch (CommandException ex) {
271                throw new BundleEngineException(ex);
272            }
273        }
274    
275        /* (non-Javadoc)
276         * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
277         */
278        @Override
279        public void suspend(String jobId) throws BundleEngineException {
280            BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId);
281            try {
282                suspend.call();
283            }
284            catch (CommandException ex) {
285                throw new BundleEngineException(ex);
286            }
287        }
288    
289        private static final Set<String> FILTER_NAMES = new HashSet<String>();
290    
291        static {
292            FILTER_NAMES.add(OozieClient.FILTER_USER);
293            FILTER_NAMES.add(OozieClient.FILTER_NAME);
294            FILTER_NAMES.add(OozieClient.FILTER_GROUP);
295            FILTER_NAMES.add(OozieClient.FILTER_STATUS);
296            FILTER_NAMES.add(OozieClient.FILTER_ID);
297        }
298    
299        /**
300         * Get bundle jobs
301         *
302         * @param filter the filter string
303         * @param start start location for paging
304         * @param len total length to get
305         * @return bundle job info
306         * @throws BundleEngineException thrown if failed to get bundle job info
307         */
308        public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException {
309            Map<String, List<String>> filterList = parseFilter(filter);
310    
311            try {
312                return new BundleJobsXCommand(filterList, start, len).call();
313            }
314            catch (CommandException ex) {
315                throw new BundleEngineException(ex);
316            }
317        }
318    
319        /**
320         * Parse filter string to a map with key = filter name and values = filter values
321         *
322         * @param filter the filter string
323         * @return filter key and value map
324         * @throws CoordinatorEngineException thrown if failed to parse filter string
325         */
326        private Map<String, List<String>> parseFilter(String filter) throws BundleEngineException {
327            Map<String, List<String>> map = new HashMap<String, List<String>>();
328            if (filter != null) {
329                StringTokenizer st = new StringTokenizer(filter, ";");
330                while (st.hasMoreTokens()) {
331                    String token = st.nextToken();
332                    if (token.contains("=")) {
333                        String[] pair = token.split("=");
334                        if (pair.length != 2) {
335                            throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
336                        }
337                        if (!FILTER_NAMES.contains(pair[0])) {
338                            throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
339                                    pair[0]));
340                        }
341                        if (pair[0].equals("status")) {
342                            try {
343                                Job.Status.valueOf(pair[1]);
344                            }
345                            catch (IllegalArgumentException ex) {
346                                throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format(
347                                        "invalid status [{0}]", pair[1]));
348                            }
349                        }
350                        List<String> list = map.get(pair[0]);
351                        if (list == null) {
352                            list = new ArrayList<String>();
353                            map.put(pair[0], list);
354                        }
355                        list.add(pair[1]);
356                    }
357                    else {
358                        throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
359                    }
360                }
361            }
362            return map;
363        }
364    
365        /**
366         * Get bulk job response
367         *
368         * @param filter the filter string
369         * @param start start location for paging
370         * @param len total length to get
371         * @return bulk job info
372         * @throws BundleEngineException thrown if failed to get bulk job info
373         */
374        public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException {
375            Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter);
376            try {
377                return new BulkJobsXCommand(bulkRequestMap, start, len).call();
378            }
379            catch (CommandException ex) {
380                throw new BundleEngineException(ex);
381            }
382        }
383    
384        /**
385         * Parse filter string to a map with key = filter name and values = filter values
386         * Allowed keys are defined as constants on top
387         *
388         * @param filter the filter string
389         * @return filter key-value pair map
390         * @throws BundleEngineException thrown if failed to parse filter string
391         */
392        public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException {
393    
394            Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>();
395            // Functionality can be extended to different job levels - TODO extend filter parser and query
396            // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL
397            if (bulkFilter != null) {
398                StringTokenizer st = new StringTokenizer(bulkParams, ";");
399                while (st.hasMoreTokens()) {
400                    String token = st.nextToken();
401                    if (token.contains("=")) {
402                        String[] pair = token.split("=");
403                        if (pair.length != 2) {
404                            throw new BundleEngineException(ErrorCode.E0420, token,
405                                    "elements must be name=value pairs");
406                        }
407                        pair[0] = pair[0].toLowerCase();
408                        String[] values = pair[1].split(",");
409                        if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) {
410                            throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]",
411                                    pair[0]));
412                        }
413                        // special check and processing for time related params
414                        if (pair[0].contains("time")) {
415                            try {
416                                DateUtils.parseDateUTC(pair[1]);
417                            }
418                            catch (ParseException e) {
419                                throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
420                                        "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1],
421                                        DateUtils.ISO8601_UTC_MASK));
422                            }
423                        }
424                        // special check for action status param
425                        // TODO: when extended for levels other than coord action, check against corresponding level's Status values
426                        if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) {
427                            for(String value : values) {
428                                try {
429                                    CoordinatorAction.Status.valueOf(value);
430                                }
431                                catch (IllegalArgumentException ex) {
432                                    throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
433                                            "invalid action status [{0}]", value));
434                                }
435                            }
436                        }
437                        // eventually adding into map for all cases e.g. names, times, status
438                        List<String> list = bulkFilter.get(pair[0]);
439                        if (list == null) {
440                            list = new ArrayList<String>();
441                            bulkFilter.put(pair[0], list);
442                        }
443                        for(String value : values) {
444                            value = value.trim();
445                            if(value.isEmpty()) {
446                                throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace");
447                            }
448                            list.add(value);
449                        }
450                    } else {
451                        throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs");
452                    }
453                }
454                if(!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME)) {
455                    throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE_NAME);
456                }
457            }
458            return bulkFilter;
459        }
460    }