001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.IOException;
021    import java.io.Writer;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Set;
029    import java.util.StringTokenizer;
030    
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.oozie.client.CoordinatorJob;
033    import org.apache.oozie.client.Job;
034    import org.apache.oozie.client.OozieClient;
035    import org.apache.oozie.client.WorkflowJob;
036    import org.apache.oozie.command.CommandException;
037    import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
038    import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
039    import org.apache.oozie.command.bundle.BundleJobSuspendXCommand;
040    import org.apache.oozie.command.bundle.BundleJobXCommand;
041    import org.apache.oozie.command.bundle.BundleJobsXCommand;
042    import org.apache.oozie.command.bundle.BundleKillXCommand;
043    import org.apache.oozie.command.bundle.BundleRerunXCommand;
044    import org.apache.oozie.command.bundle.BundleStartXCommand;
045    import org.apache.oozie.command.bundle.BundleSubmitXCommand;
046    import org.apache.oozie.service.DagXLogInfoService;
047    import org.apache.oozie.service.Services;
048    import org.apache.oozie.service.XLogService;
049    import org.apache.oozie.util.ParamChecker;
050    import org.apache.oozie.util.XLog;
051    import org.apache.oozie.util.XLogStreamer;
052    
053    public class BundleEngine extends BaseEngine {
054        /**
055         * Create a system Bundle engine, with no user and no group.
056         */
057        public BundleEngine() {
058        }
059    
060        /**
061         * Create a Bundle engine to perform operations on behave of a user.
062         *
063         * @param user user name.
064         * @param authToken the authentication token.
065         */
066        public BundleEngine(String user, String authToken) {
067            this.user = ParamChecker.notEmpty(user, "user");
068            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
069        }
070    
071        /* (non-Javadoc)
072         * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
073         */
074        @Override
075        public void change(String jobId, String changeValue) throws BundleEngineException {
076            try {
077                BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue);
078                change.call();
079            }
080            catch (CommandException ex) {
081                throw new BundleEngineException(ex);
082            }
083        }
084    
085        /* (non-Javadoc)
086         * @see org.apache.oozie.BaseEngine#dryrunSubmit(org.apache.hadoop.conf.Configuration, boolean)
087         */
088        @Override
089        public String dryrunSubmit(Configuration conf, boolean startJob) throws BundleEngineException {
090            BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf, getAuthToken());
091            try {
092                String jobId = submit.call();
093                return jobId;
094            }
095            catch (CommandException ex) {
096                throw new BundleEngineException(ex);
097            }
098        }
099    
100        /* (non-Javadoc)
101         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
102         */
103        @Override
104        public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException {
105            throw new BundleEngineException(new XException(ErrorCode.E0301));
106        }
107    
108        public BundleJobBean getBundleJob(String jobId) throws BundleEngineException {
109            try {
110                return new BundleJobXCommand(jobId).call();
111            }
112            catch (CommandException ex) {
113                throw new BundleEngineException(ex);
114            }
115        }
116    
117        /* (non-Javadoc)
118         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
119         */
120        @Override
121        public CoordinatorJob getCoordJob(String jobId, int start, int length) throws BundleEngineException {
122            throw new BundleEngineException(new XException(ErrorCode.E0301));
123        }
124    
125        /* (non-Javadoc)
126         * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
127         */
128        @Override
129        public String getDefinition(String jobId) throws BundleEngineException {
130            BundleJobBean job;
131            try {
132                job = new BundleJobXCommand(jobId).call();
133            }
134            catch (CommandException ex) {
135                throw new BundleEngineException(ex);
136            }
137            return job.getOrigJobXml();
138        }
139    
140        /* (non-Javadoc)
141         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
142         */
143        @Override
144        public WorkflowJob getJob(String jobId) throws BundleEngineException {
145            throw new BundleEngineException(new XException(ErrorCode.E0301));
146        }
147    
148        /* (non-Javadoc)
149         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
150         */
151        @Override
152        public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException {
153            throw new BundleEngineException(new XException(ErrorCode.E0301));
154        }
155    
156        /* (non-Javadoc)
157         * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
158         */
159        @Override
160        public String getJobIdForExternalId(String externalId) throws BundleEngineException {
161            return null;
162        }
163    
164        /* (non-Javadoc)
165         * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
166         */
167        @Override
168        public void kill(String jobId) throws BundleEngineException {
169            try {
170                new BundleKillXCommand(jobId).call();
171            }
172            catch (CommandException e) {
173                throw new BundleEngineException(e);
174            }
175        }
176    
177        /* (non-Javadoc)
178         * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration)
179         */
180        @Override
181        @Deprecated
182        public void reRun(String jobId, Configuration conf) throws BundleEngineException {
183            throw new BundleEngineException(new XException(ErrorCode.E0301));
184        }
185    
186        /**
187         * Rerun Bundle actions for given rerunType
188         *
189         * @param jobId bundle job id
190         * @param coordScope the rerun scope for coordinator job names separated by ","
191         * @param dateScope the rerun scope for coordinator nominal times separated by ","
192         * @param refresh true if user wants to refresh input/outpur dataset urls
193         * @param noCleanup false if user wants to cleanup output events for given rerun actions
194         * @throws BaseEngineException thrown if failed to rerun
195         */
196        public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
197                throws BaseEngineException {
198            try {
199                new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call();
200            }
201            catch (CommandException ex) {
202                throw new BaseEngineException(ex);
203            }
204        }
205    
206        /* (non-Javadoc)
207         * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
208         */
209        @Override
210        public void resume(String jobId) throws BundleEngineException {
211            BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId);
212            try {
213                resume.call();
214            }
215            catch (CommandException ex) {
216                throw new BundleEngineException(ex);
217            }
218        }
219    
220        /* (non-Javadoc)
221         * @see org.apache.oozie.BaseEngine#start(java.lang.String)
222         */
223        @Override
224        public void start(String jobId) throws BundleEngineException {
225            try {
226                new BundleStartXCommand(jobId).call();
227            }
228            catch (CommandException e) {
229                throw new BundleEngineException(e);
230            }
231        }
232    
233        /* (non-Javadoc)
234         * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer)
235         */
236        @Override
237        public void streamLog(String jobId, Writer writer) throws IOException, BundleEngineException {
238            XLogStreamer.Filter filter = new XLogStreamer.Filter();
239            filter.setParameter(DagXLogInfoService.JOB, jobId);
240    
241            BundleJobBean job;
242            try {
243                job = new BundleJobXCommand(jobId).call();
244            }
245            catch (CommandException ex) {
246                throw new BundleEngineException(ex);
247            }
248    
249            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
250        }
251    
252        /* (non-Javadoc)
253         * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)
254         */
255        @Override
256        public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException {
257            try {
258                String jobId = new BundleSubmitXCommand(conf, getAuthToken()).call();
259    
260                if (startJob) {
261                    start(jobId);
262                }
263                return jobId;
264            }
265            catch (CommandException ex) {
266                throw new BundleEngineException(ex);
267            }
268        }
269    
270        /* (non-Javadoc)
271         * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
272         */
273        @Override
274        public void suspend(String jobId) throws BundleEngineException {
275            BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId);
276            try {
277                suspend.call();
278            }
279            catch (CommandException ex) {
280                throw new BundleEngineException(ex);
281            }
282        }
283    
284        private static final Set<String> FILTER_NAMES = new HashSet<String>();
285    
286        static {
287            FILTER_NAMES.add(OozieClient.FILTER_USER);
288            FILTER_NAMES.add(OozieClient.FILTER_NAME);
289            FILTER_NAMES.add(OozieClient.FILTER_GROUP);
290            FILTER_NAMES.add(OozieClient.FILTER_STATUS);
291            FILTER_NAMES.add(OozieClient.FILTER_ID);
292        }
293    
294        /**
295         * Get bundle jobs
296         *
297         * @param filterStr the filter string
298         * @param start start location for paging
299         * @param len total length to get
300         * @return bundle job info
301         * @throws BundleEngineException thrown if failed to get bundle job info
302         */
303        public BundleJobInfo getBundleJobs(String filterStr, int start, int len) throws BundleEngineException {
304            Map<String, List<String>> filter = parseFilter(filterStr);
305    
306            try {
307                return new BundleJobsXCommand(filter, start, len).call();
308            }
309            catch (CommandException ex) {
310                throw new BundleEngineException(ex);
311            }
312        }
313    
314        /**
315         * Parse filter string to a map with key = filter name and values = filter values
316         *
317         * @param filter the filter string
318         * @return filter key and value map
319         * @throws CoordinatorEngineException thrown if failed to parse filter string
320         */
321        private Map<String, List<String>> parseFilter(String filter) throws BundleEngineException {
322            Map<String, List<String>> map = new HashMap<String, List<String>>();
323            if (filter != null) {
324                StringTokenizer st = new StringTokenizer(filter, ";");
325                while (st.hasMoreTokens()) {
326                    String token = st.nextToken();
327                    if (token.contains("=")) {
328                        String[] pair = token.split("=");
329                        if (pair.length != 2) {
330                            throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
331                        }
332                        if (!FILTER_NAMES.contains(pair[0])) {
333                            throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
334                                    pair[0]));
335                        }
336                        if (pair[0].equals("status")) {
337                            try {
338                                Job.Status.valueOf(pair[1]);
339                            }
340                            catch (IllegalArgumentException ex) {
341                                throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format(
342                                        "invalid status [{0}]", pair[1]));
343                            }
344                        }
345                        List<String> list = map.get(pair[0]);
346                        if (list == null) {
347                            list = new ArrayList<String>();
348                            map.put(pair[0], list);
349                        }
350                        list.add(pair[1]);
351                    }
352                    else {
353                        throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
354                    }
355                }
356            }
357            return map;
358        }
359    
360    }