This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie;
020
021import java.io.IOException;
022import java.io.Writer;
023import java.text.ParseException;
024import java.util.ArrayList;
025import java.util.Date;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.StringTokenizer;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.oozie.BaseEngine.LOG_TYPE;
033import org.apache.oozie.client.CoordinatorAction;
034import org.apache.oozie.client.CoordinatorJob;
035import org.apache.oozie.client.WorkflowJob;
036import org.apache.oozie.client.rest.BulkResponseImpl;
037import org.apache.oozie.command.BulkJobsXCommand;
038import org.apache.oozie.command.CommandException;
039import org.apache.oozie.command.bundle.BundleSLAAlertsDisableXCommand;
040import org.apache.oozie.command.bundle.BundleSLAAlertsEnableXCommand;
041import org.apache.oozie.command.bundle.BundleSLAChangeXCommand;
042import org.apache.oozie.command.bundle.BulkBundleXCommand;
043import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
044import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
045import org.apache.oozie.command.bundle.BundleJobSuspendXCommand;
046import org.apache.oozie.command.bundle.BundleJobXCommand;
047import org.apache.oozie.command.bundle.BundleJobsXCommand;
048import org.apache.oozie.command.bundle.BundleKillXCommand;
049import org.apache.oozie.command.bundle.BundleRerunXCommand;
050import org.apache.oozie.command.bundle.BundleStartXCommand;
051import org.apache.oozie.command.bundle.BundleSubmitXCommand;
052import org.apache.oozie.command.OperationType;
053import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
054import org.apache.oozie.executor.jpa.JPAExecutorException;
055import org.apache.oozie.service.DagXLogInfoService;
056import org.apache.oozie.util.DateUtils;
057import org.apache.oozie.util.JobsFilterUtils;
058import org.apache.oozie.util.JobUtils;
059import org.apache.oozie.util.XLogAuditFilter;
060import org.apache.oozie.util.XLogFilter;
061import org.apache.oozie.util.XLogUserFilterParam;
062import org.apache.oozie.util.ParamChecker;
063import org.apache.oozie.util.XLog;
064
065import com.google.common.annotations.VisibleForTesting;
066
067import javax.servlet.ServletException;
068
069public class BundleEngine extends BaseEngine {
070    /**
071     * Create a system Bundle engine, with no user and no group.
072     */
073    public BundleEngine() {
074    }
075
076    /**
077     * Create a Bundle engine to perform operations on behave of a user.
078     *
079     * @param user user name.
080     */
081    public BundleEngine(String user) {
082        this.user = ParamChecker.notEmpty(user, "user");
083    }
084
085    /* (non-Javadoc)
086     * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
087     */
088    @Override
089    public void change(String jobId, String changeValue) throws BundleEngineException {
090        try {
091            BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue);
092            change.call();
093        }
094        catch (CommandException ex) {
095            throw new BundleEngineException(ex);
096        }
097    }
098
099    /* (non-Javadoc)
100     * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
101     */
102    @Override
103    public String dryRunSubmit(Configuration conf) throws BundleEngineException {
104        BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf);
105        try {
106            String jobId = submit.call();
107            return jobId;
108        }
109        catch (CommandException ex) {
110            throw new BundleEngineException(ex);
111        }
112    }
113
114    /* (non-Javadoc)
115     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
116     */
117    @Override
118    public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException {
119        throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine"));
120    }
121
122    public BundleJobBean getBundleJob(String jobId) throws BundleEngineException {
123        try {
124            return new BundleJobXCommand(jobId).call();
125        }
126        catch (CommandException ex) {
127            throw new BundleEngineException(ex);
128        }
129    }
130
131    /* (non-Javadoc)
132     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
133     */
134    @Override
135    public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
136            throws BundleEngineException {
137        throw new BundleEngineException(new XException(ErrorCode.E0301,
138                "cannot get a coordinator job from BundleEngine"));
139    }
140
141    /* (non-Javadoc)
142     * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
143     */
144    @Override
145    public String getDefinition(String jobId) throws BundleEngineException {
146        BundleJobBean job;
147        try {
148            job = new BundleJobXCommand(jobId).call();
149        }
150        catch (CommandException ex) {
151            throw new BundleEngineException(ex);
152        }
153        return job.getOrigJobXml();
154    }
155
156    /* (non-Javadoc)
157     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
158     */
159    @Override
160    public WorkflowJob getJob(String jobId) throws BundleEngineException {
161        throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
162    }
163
164    /* (non-Javadoc)
165     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
166     */
167    @Override
168    public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException {
169        throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
170    }
171
172    /* (non-Javadoc)
173     * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
174     */
175    @Override
176    public String getJobIdForExternalId(String externalId) throws BundleEngineException {
177        return null;
178    }
179
180    /* (non-Javadoc)
181     * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
182     */
183    @Override
184    public void kill(String jobId) throws BundleEngineException {
185        try {
186            new BundleKillXCommand(jobId).call();
187        }
188        catch (CommandException e) {
189            throw new BundleEngineException(e);
190        }
191    }
192
193    /* (non-Javadoc)
194     * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration)
195     */
196    @Override
197    @Deprecated
198    public void reRun(String jobId, Configuration conf) throws BundleEngineException {
199        throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun"));
200    }
201
202    /**
203     * Rerun Bundle actions for given rerunType
204     *
205     * @param jobId bundle job id
206     * @param coordScope the rerun scope for coordinator job names separated by ","
207     * @param dateScope the rerun scope for coordinator nominal times separated by ","
208     * @param refresh true if user wants to refresh input/outpur dataset urls
209     * @param noCleanup false if user wants to cleanup output events for given rerun actions
210     * @throws BaseEngineException thrown if failed to rerun
211     */
212    public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
213            throws BaseEngineException {
214        try {
215            new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call();
216        }
217        catch (CommandException ex) {
218            throw new BaseEngineException(ex);
219        }
220    }
221
222    /* (non-Javadoc)
223     * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
224     */
225    @Override
226    public void resume(String jobId) throws BundleEngineException {
227        BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId);
228        try {
229            resume.call();
230        }
231        catch (CommandException ex) {
232            throw new BundleEngineException(ex);
233        }
234    }
235
236    /* (non-Javadoc)
237     * @see org.apache.oozie.BaseEngine#start(java.lang.String)
238     */
239    @Override
240    public void start(String jobId) throws BundleEngineException {
241        try {
242            new BundleStartXCommand(jobId).call();
243        }
244        catch (CommandException e) {
245            throw new BundleEngineException(e);
246        }
247    }
248
249    @Override
250    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
251            BundleEngineException {
252        streamJobLog(jobId, writer, params, LOG_TYPE.LOG);
253    }
254
255    @Override
256    public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
257            BundleEngineException {
258        streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG);
259    }
260
261    @Override
262    public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
263            BundleEngineException {
264        try {
265            streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)), jobId, writer, params, LOG_TYPE.AUDIT_LOG);
266        }
267        catch (CommandException e) {
268            throw new IOException(e);
269        }
270    }
271
272    private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
273            throws IOException, BundleEngineException {
274        try {
275            streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType);
276        }
277        catch (Exception e) {
278            throw new IOException(e);
279        }
280    }
281
282    private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
283            throws IOException, BundleEngineException {
284        try {
285            BundleJobBean job;
286            filter.setParameter(DagXLogInfoService.JOB, jobId);
287            job = new BundleJobXCommand(jobId).call();
288            Date lastTime = null;
289            if (job.isTerminalStatus()) {
290                lastTime = job.getLastModifiedTime();
291            }
292            if (lastTime == null) {
293                lastTime = new Date();
294            }
295            fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
296        }
297        catch (Exception ex) {
298            throw new IOException(ex);
299        }
300    }
301
302    /* (non-Javadoc)
303     * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)
304     */
305    @Override
306    public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException {
307        try {
308            String jobId = new BundleSubmitXCommand(conf).call();
309
310            if (startJob) {
311                start(jobId);
312            }
313            return jobId;
314        }
315        catch (CommandException ex) {
316            throw new BundleEngineException(ex);
317        }
318    }
319
320    /* (non-Javadoc)
321     * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
322     */
323    @Override
324    public void suspend(String jobId) throws BundleEngineException {
325        BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId);
326        try {
327            suspend.call();
328        }
329        catch (CommandException ex) {
330            throw new BundleEngineException(ex);
331        }
332    }
333
334    /**
335     * Get bundle jobs
336     *
337     * @param filter the filter string
338     * @param start start location for paging
339     * @param len total length to get
340     * @return bundle job info
341     * @throws BundleEngineException thrown if failed to get bundle job info
342     */
343    public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException {
344        Map<String, List<String>> filterList = parseFilter(filter);
345
346        try {
347            return new BundleJobsXCommand(filterList, start, len).call();
348        }
349        catch (CommandException ex) {
350            throw new BundleEngineException(ex);
351        }
352    }
353
354    /**
355     * Parse filter string to a map with key = filter name and values = filter values
356     *
357     * @param filter the filter string
358     * @return filter key and value map
359     * @throws CoordinatorEngineException thrown if failed to parse filter string
360     */
361    @VisibleForTesting
362    Map<String, List<String>> parseFilter(String filter) throws BundleEngineException {
363        try {
364            return JobsFilterUtils.parseFilter(filter);
365        }
366        catch (ServletException ex) {
367            throw new BundleEngineException(ErrorCode.E0420, filter, ex.getMessage());
368        }
369    }
370
371    /**
372     * Get bulk job response
373     *
374     * @param bulkFilter the filter string
375     * @param start start location for paging
376     * @param len total length to get
377     * @return bulk job info
378     * @throws BundleEngineException thrown if failed to get bulk job info
379     */
380    public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException {
381        Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter);
382        try {
383            return new BulkJobsXCommand(bulkRequestMap, start, len).call();
384        }
385        catch (CommandException ex) {
386            throw new BundleEngineException(ex);
387        }
388    }
389
390    /**
391     * Parse filter string to a map with key = filter name and values = filter values
392     * Allowed keys are defined as constants on top
393     *
394     * @param bulkParams the filter string
395     * @return filter key-value pair map
396     * @throws BundleEngineException thrown if failed to parse filter string
397     */
398    public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException {
399
400        Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>();
401        // Functionality can be extended to different job levels - TODO extend filter parser and query
402        // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL
403        if (bulkFilter != null) {
404            StringTokenizer st = new StringTokenizer(bulkParams, ";");
405            while (st.hasMoreTokens()) {
406                String token = st.nextToken();
407                if (token.contains("=")) {
408                    String[] pair = token.split("=");
409                    if (pair.length != 2) {
410                        throw new BundleEngineException(ErrorCode.E0420, token,
411                                "elements must be name=value pairs");
412                    }
413                    pair[0] = pair[0].toLowerCase();
414                    String[] values = pair[1].split(",");
415                    if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) {
416                        throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]",
417                                pair[0]));
418                    }
419                    // special check and processing for time related params
420                    if (pair[0].contains("time")) {
421                        try {
422                            DateUtils.parseDateUTC(pair[1]);
423                        }
424                        catch (ParseException e) {
425                            throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
426                                    "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1],
427                                    DateUtils.ISO8601_UTC_MASK));
428                        }
429                    }
430                    // special check for action status param
431                    // TODO: when extended for levels other than coord action, check against corresponding level's Status values
432                    if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) {
433                        for(String value : values) {
434                            try {
435                                CoordinatorAction.Status.valueOf(value);
436                            }
437                            catch (IllegalArgumentException ex) {
438                                throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
439                                        "invalid action status [{0}]", value));
440                            }
441                        }
442                    }
443                    // eventually adding into map for all cases e.g. names, times, status
444                    List<String> list = bulkFilter.get(pair[0]);
445                    if (list == null) {
446                        list = new ArrayList<String>();
447                        bulkFilter.put(pair[0], list);
448                    }
449                    for(String value : values) {
450                        value = value.trim();
451                        if(value.isEmpty()) {
452                            throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace");
453                        }
454                        list.add(value);
455                    }
456                } else {
457                    throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs");
458                }
459            }
460            if (!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE)) {
461                throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE);
462            }
463        }
464        return bulkFilter;
465    }
466
467    /**
468     * Return the status for a Job ID
469     *
470     * @param jobId job Id.
471     * @return the job's status
472     * @throws BundleEngineException thrown if the job's status could not be obtained
473     */
474    @Override
475    public String getJobStatus(String jobId) throws BundleEngineException {
476        try {
477            BundleJobBean bundleJob = BundleJobQueryExecutor.getInstance().get(
478                    BundleJobQueryExecutor.BundleJobQuery.GET_BUNDLE_JOB_STATUS, jobId);
479            return bundleJob.getStatusStr();
480        }
481        catch (JPAExecutorException e) {
482            throw new BundleEngineException(e);
483        }
484    }
485
486    @Override
487    public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
488        try {
489            new BundleSLAAlertsEnableXCommand(id, actions, dates, childIds).call();
490        }
491        catch (CommandException e) {
492            throw new BundleEngineException(e);
493        }
494    }
495
496    @Override
497    public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
498        try {
499            new BundleSLAAlertsDisableXCommand(id, actions, dates, childIds).call();
500        }
501        catch (CommandException e) {
502            throw new BundleEngineException(e);
503        }
504    }
505
506    @Override
507    public void changeSLA(String id, String actions, String dates, String childIds, String newParams)
508            throws BaseEngineException {
509        Map<String, String> slaNewParams = null;
510        try {
511
512            if (newParams != null) {
513                slaNewParams = JobUtils.parseChangeValue(newParams);
514            }
515            new BundleSLAChangeXCommand(id, actions, dates, childIds, slaNewParams).call();
516        }
517        catch (CommandException e) {
518            throw new BundleEngineException(e);
519        }
520    }
521
522    /**
523     * return a list of killed Bundle job
524     *
525     * @param filter, the filter string for which the bundle jobs are killed
526     * @param start, the starting index for bundle jobs
527     * @param len, maximum number of jobs to be killed
528     * @return the list of jobs being killed
529     * @throws BundleEngineException thrown if one or more of the jobs cannot be killed
530     */
531    public BundleJobInfo killJobs(String filter, int start, int len) throws BundleEngineException {
532        try {
533            Map<String, List<String>> filterList = parseFilter(filter);
534            BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Kill).call();
535            if (bundleJobInfo == null) {
536                return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0);
537            }
538            return bundleJobInfo;
539        }
540        catch (CommandException ex) {
541            throw new BundleEngineException(ex);
542        }
543    }
544
545    /**
546     * return a list of suspended Bundle job
547     *
548     * @param filter, the filter string for which the bundle jobs are suspended
549     * @param start, the starting index for bundle jobs
550     * @param len, maximum number of jobs to be suspended
551     * @return the list of jobs being suspended
552     * @throws BundleEngineException thrown if one or more of the jobs cannot be suspended
553     */
554    public BundleJobInfo suspendJobs(String filter, int start, int len) throws BundleEngineException {
555        try {
556            Map<String, List<String>> filterList = parseFilter(filter);
557            BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Suspend).call();
558            if (bundleJobInfo == null) {
559                return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0);
560            }
561            return bundleJobInfo;
562        }
563        catch (CommandException ex) {
564            throw new BundleEngineException(ex);
565        }
566    }
567
568    /**
569     * return a list of resumed Bundle job
570     *
571     * @param filter, the filter string for which the bundle jobs are resumed
572     * @param start, the starting index for bundle jobs
573     * @param len, maximum number of jobs to be resumed
574     * @return the list of jobs being resumed
575     * @throws BundleEngineException thrown if one or more of the jobs cannot be resumed
576     */
577    public BundleJobInfo resumeJobs(String filter, int start, int len) throws BundleEngineException {
578        try {
579            Map<String, List<String>> filterList = parseFilter(filter);
580            BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Resume).call();
581            if (bundleJobInfo == null) {
582                return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0);
583            }
584            return bundleJobInfo;
585        }
586        catch (CommandException ex) {
587            throw new BundleEngineException(ex);
588        }
589    }
590}