001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.IOException;
021    import java.io.Writer;
022    import java.text.ParseException;
023    import java.util.ArrayList;
024    import java.util.Date;
025    import java.util.HashMap;
026    import java.util.HashSet;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.StringTokenizer;
031    
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.oozie.client.CoordinatorAction;
034    import org.apache.oozie.client.CoordinatorJob;
035    import org.apache.oozie.client.Job;
036    import org.apache.oozie.client.OozieClient;
037    import org.apache.oozie.client.WorkflowJob;
038    import org.apache.oozie.client.rest.BulkResponseImpl;
039    import org.apache.oozie.command.BulkJobsXCommand;
040    import org.apache.oozie.command.CommandException;
041    import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
042    import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
043    import org.apache.oozie.command.bundle.BundleJobSuspendXCommand;
044    import org.apache.oozie.command.bundle.BundleJobXCommand;
045    import org.apache.oozie.command.bundle.BundleJobsXCommand;
046    import org.apache.oozie.command.bundle.BundleKillXCommand;
047    import org.apache.oozie.command.bundle.BundleRerunXCommand;
048    import org.apache.oozie.command.bundle.BundleStartXCommand;
049    import org.apache.oozie.command.bundle.BundleSubmitXCommand;
050    import org.apache.oozie.service.DagXLogInfoService;
051    import org.apache.oozie.service.Services;
052    import org.apache.oozie.service.XLogService;
053    import org.apache.oozie.util.DateUtils;
054    import org.apache.oozie.util.ParamChecker;
055    import org.apache.oozie.util.XLog;
056    import org.apache.oozie.util.XLogStreamer;
057    
058    import com.google.common.annotations.VisibleForTesting;
059    
060    public class BundleEngine extends BaseEngine {
061        /**
062         * Create a system Bundle engine, with no user and no group.
063         */
064        public BundleEngine() {
065        }
066    
067        /**
068         * Create a Bundle engine to perform operations on behave of a user.
069         *
070         * @param user user name.
071         */
072        public BundleEngine(String user) {
073            this.user = ParamChecker.notEmpty(user, "user");
074        }
075    
076        /* (non-Javadoc)
077         * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
078         */
079        @Override
080        public void change(String jobId, String changeValue) throws BundleEngineException {
081            try {
082                BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue);
083                change.call();
084            }
085            catch (CommandException ex) {
086                throw new BundleEngineException(ex);
087            }
088        }
089    
090        /* (non-Javadoc)
091         * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
092         */
093        @Override
094        public String dryRunSubmit(Configuration conf) throws BundleEngineException {
095            BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf);
096            try {
097                String jobId = submit.call();
098                return jobId;
099            }
100            catch (CommandException ex) {
101                throw new BundleEngineException(ex);
102            }
103        }
104    
105        /* (non-Javadoc)
106         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
107         */
108        @Override
109        public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException {
110            throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine"));
111        }
112    
113        public BundleJobBean getBundleJob(String jobId) throws BundleEngineException {
114            try {
115                return new BundleJobXCommand(jobId).call();
116            }
117            catch (CommandException ex) {
118                throw new BundleEngineException(ex);
119            }
120        }
121    
122        /* (non-Javadoc)
123         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
124         */
125        @Override
126        public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
127                throws BundleEngineException {
128            throw new BundleEngineException(new XException(ErrorCode.E0301,
129                    "cannot get a coordinator job from BundleEngine"));
130        }
131    
132        /* (non-Javadoc)
133         * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
134         */
135        @Override
136        public String getDefinition(String jobId) throws BundleEngineException {
137            BundleJobBean job;
138            try {
139                job = new BundleJobXCommand(jobId).call();
140            }
141            catch (CommandException ex) {
142                throw new BundleEngineException(ex);
143            }
144            return job.getOrigJobXml();
145        }
146    
147        /* (non-Javadoc)
148         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
149         */
150        @Override
151        public WorkflowJob getJob(String jobId) throws BundleEngineException {
152            throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
153        }
154    
155        /* (non-Javadoc)
156         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
157         */
158        @Override
159        public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException {
160            throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
161        }
162    
163        /* (non-Javadoc)
164         * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
165         */
166        @Override
167        public String getJobIdForExternalId(String externalId) throws BundleEngineException {
168            return null;
169        }
170    
171        /* (non-Javadoc)
172         * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
173         */
174        @Override
175        public void kill(String jobId) throws BundleEngineException {
176            try {
177                new BundleKillXCommand(jobId).call();
178            }
179            catch (CommandException e) {
180                throw new BundleEngineException(e);
181            }
182        }
183    
184        /* (non-Javadoc)
185         * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration)
186         */
187        @Override
188        @Deprecated
189        public void reRun(String jobId, Configuration conf) throws BundleEngineException {
190            throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun"));
191        }
192    
193        /**
194         * Rerun Bundle actions for given rerunType
195         *
196         * @param jobId bundle job id
197         * @param coordScope the rerun scope for coordinator job names separated by ","
198         * @param dateScope the rerun scope for coordinator nominal times separated by ","
199         * @param refresh true if user wants to refresh input/outpur dataset urls
200         * @param noCleanup false if user wants to cleanup output events for given rerun actions
201         * @throws BaseEngineException thrown if failed to rerun
202         */
203        public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
204                throws BaseEngineException {
205            try {
206                new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call();
207            }
208            catch (CommandException ex) {
209                throw new BaseEngineException(ex);
210            }
211        }
212    
213        /* (non-Javadoc)
214         * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
215         */
216        @Override
217        public void resume(String jobId) throws BundleEngineException {
218            BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId);
219            try {
220                resume.call();
221            }
222            catch (CommandException ex) {
223                throw new BundleEngineException(ex);
224            }
225        }
226    
227        /* (non-Javadoc)
228         * @see org.apache.oozie.BaseEngine#start(java.lang.String)
229         */
230        @Override
231        public void start(String jobId) throws BundleEngineException {
232            try {
233                new BundleStartXCommand(jobId).call();
234            }
235            catch (CommandException e) {
236                throw new BundleEngineException(e);
237            }
238        }
239    
240        /* (non-Javadoc)
241         * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer)
242         */
243        @Override
244        public void streamLog(String jobId, Writer writer) throws IOException, BundleEngineException {
245            XLogStreamer.Filter filter = new XLogStreamer.Filter();
246            filter.setParameter(DagXLogInfoService.JOB, jobId);
247    
248            BundleJobBean job;
249            try {
250                job = new BundleJobXCommand(jobId).call();
251            }
252            catch (CommandException ex) {
253                throw new BundleEngineException(ex);
254            }
255    
256            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
257        }
258    
259        /* (non-Javadoc)
260         * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)
261         */
262        @Override
263        public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException {
264            try {
265                String jobId = new BundleSubmitXCommand(conf).call();
266    
267                if (startJob) {
268                    start(jobId);
269                }
270                return jobId;
271            }
272            catch (CommandException ex) {
273                throw new BundleEngineException(ex);
274            }
275        }
276    
277        /* (non-Javadoc)
278         * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
279         */
280        @Override
281        public void suspend(String jobId) throws BundleEngineException {
282            BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId);
283            try {
284                suspend.call();
285            }
286            catch (CommandException ex) {
287                throw new BundleEngineException(ex);
288            }
289        }
290    
291        private static final Set<String> FILTER_NAMES = new HashSet<String>();
292    
293        static {
294            FILTER_NAMES.add(OozieClient.FILTER_USER);
295            FILTER_NAMES.add(OozieClient.FILTER_NAME);
296            FILTER_NAMES.add(OozieClient.FILTER_GROUP);
297            FILTER_NAMES.add(OozieClient.FILTER_STATUS);
298            FILTER_NAMES.add(OozieClient.FILTER_ID);
299        }
300    
301        /**
302         * Get bundle jobs
303         *
304         * @param filter the filter string
305         * @param start start location for paging
306         * @param len total length to get
307         * @return bundle job info
308         * @throws BundleEngineException thrown if failed to get bundle job info
309         */
310        public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException {
311            Map<String, List<String>> filterList = parseFilter(filter);
312    
313            try {
314                return new BundleJobsXCommand(filterList, start, len).call();
315            }
316            catch (CommandException ex) {
317                throw new BundleEngineException(ex);
318            }
319        }
320    
321        /**
322         * Parse filter string to a map with key = filter name and values = filter values
323         *
324         * @param filter the filter string
325         * @return filter key and value map
326         * @throws CoordinatorEngineException thrown if failed to parse filter string
327         */
328        @VisibleForTesting
329        Map<String, List<String>> parseFilter(String filter) throws BundleEngineException {
330            Map<String, List<String>> map = new HashMap<String, List<String>>();
331            if (filter != null) {
332                StringTokenizer st = new StringTokenizer(filter, ";");
333                while (st.hasMoreTokens()) {
334                    String token = st.nextToken();
335                    if (token.contains("=")) {
336                        String[] pair = token.split("=");
337                        if (pair.length != 2) {
338                            throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
339                        }
340                        if (!FILTER_NAMES.contains(pair[0])) {
341                            throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
342                                    pair[0]));
343                        }
344                        if (pair[0].equals("status")) {
345                            try {
346                                Job.Status.valueOf(pair[1]);
347                            }
348                            catch (IllegalArgumentException ex) {
349                                throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format(
350                                        "invalid status [{0}]", pair[1]));
351                            }
352                        }
353                        List<String> list = map.get(pair[0]);
354                        if (list == null) {
355                            list = new ArrayList<String>();
356                            map.put(pair[0], list);
357                        }
358                        list.add(pair[1]);
359                    }
360                    else {
361                        throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
362                    }
363                }
364            }
365            return map;
366        }
367    
368        /**
369         * Get bulk job response
370         *
371         * @param filter the filter string
372         * @param start start location for paging
373         * @param len total length to get
374         * @return bulk job info
375         * @throws BundleEngineException thrown if failed to get bulk job info
376         */
377        public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException {
378            Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter);
379            try {
380                return new BulkJobsXCommand(bulkRequestMap, start, len).call();
381            }
382            catch (CommandException ex) {
383                throw new BundleEngineException(ex);
384            }
385        }
386    
387        /**
388         * Parse filter string to a map with key = filter name and values = filter values
389         * Allowed keys are defined as constants on top
390         *
391         * @param filter the filter string
392         * @return filter key-value pair map
393         * @throws BundleEngineException thrown if failed to parse filter string
394         */
395        public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException {
396    
397            Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>();
398            // Functionality can be extended to different job levels - TODO extend filter parser and query
399            // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL
400            if (bulkFilter != null) {
401                StringTokenizer st = new StringTokenizer(bulkParams, ";");
402                while (st.hasMoreTokens()) {
403                    String token = st.nextToken();
404                    if (token.contains("=")) {
405                        String[] pair = token.split("=");
406                        if (pair.length != 2) {
407                            throw new BundleEngineException(ErrorCode.E0420, token,
408                                    "elements must be name=value pairs");
409                        }
410                        pair[0] = pair[0].toLowerCase();
411                        String[] values = pair[1].split(",");
412                        if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) {
413                            throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]",
414                                    pair[0]));
415                        }
416                        // special check and processing for time related params
417                        if (pair[0].contains("time")) {
418                            try {
419                                DateUtils.parseDateUTC(pair[1]);
420                            }
421                            catch (ParseException e) {
422                                throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
423                                        "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1],
424                                        DateUtils.ISO8601_UTC_MASK));
425                            }
426                        }
427                        // special check for action status param
428                        // TODO: when extended for levels other than coord action, check against corresponding level's Status values
429                        if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) {
430                            for(String value : values) {
431                                try {
432                                    CoordinatorAction.Status.valueOf(value);
433                                }
434                                catch (IllegalArgumentException ex) {
435                                    throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
436                                            "invalid action status [{0}]", value));
437                                }
438                            }
439                        }
440                        // eventually adding into map for all cases e.g. names, times, status
441                        List<String> list = bulkFilter.get(pair[0]);
442                        if (list == null) {
443                            list = new ArrayList<String>();
444                            bulkFilter.put(pair[0], list);
445                        }
446                        for(String value : values) {
447                            value = value.trim();
448                            if(value.isEmpty()) {
449                                throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace");
450                            }
451                            list.add(value);
452                        }
453                    } else {
454                        throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs");
455                    }
456                }
457                if(!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME)) {
458                    throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE_NAME);
459                }
460            }
461            return bulkFilter;
462        }
463    }