001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie;
020
021import java.io.IOException;
022import java.io.Writer;
023import java.text.ParseException;
024import java.util.ArrayList;
025import java.util.Date;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.StringTokenizer;
030
031import javax.servlet.ServletException;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.oozie.client.CoordinatorAction;
035import org.apache.oozie.client.CoordinatorJob;
036import org.apache.oozie.client.WorkflowJob;
037import org.apache.oozie.client.rest.BulkResponseImpl;
038import org.apache.oozie.command.BulkJobsXCommand;
039import org.apache.oozie.command.CommandException;
040import org.apache.oozie.command.OperationType;
041import org.apache.oozie.command.bundle.BulkBundleXCommand;
042import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
043import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
044import org.apache.oozie.command.bundle.BundleJobSuspendXCommand;
045import org.apache.oozie.command.bundle.BundleJobXCommand;
046import org.apache.oozie.command.bundle.BundleJobsXCommand;
047import org.apache.oozie.command.bundle.BundleKillXCommand;
048import org.apache.oozie.command.bundle.BundleRerunXCommand;
049import org.apache.oozie.command.bundle.BundleSLAAlertsDisableXCommand;
050import org.apache.oozie.command.bundle.BundleSLAAlertsEnableXCommand;
051import org.apache.oozie.command.bundle.BundleSLAChangeXCommand;
052import org.apache.oozie.command.bundle.BundleStartXCommand;
053import org.apache.oozie.command.bundle.BundleSubmitXCommand;
054import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
055import org.apache.oozie.executor.jpa.JPAExecutorException;
056import org.apache.oozie.service.DagXLogInfoService;
057import org.apache.oozie.util.DateUtils;
058import org.apache.oozie.util.JobUtils;
059import org.apache.oozie.util.JobsFilterUtils;
060import org.apache.oozie.util.ParamChecker;
061import org.apache.oozie.util.XLog;
062import org.apache.oozie.util.XLogAuditFilter;
063import org.apache.oozie.util.XLogFilter;
064import org.apache.oozie.util.XLogUserFilterParam;
065
066import com.google.common.annotations.VisibleForTesting;
067
068public class BundleEngine extends BaseEngine {
069    /**
070     * Create a system Bundle engine, with no user and no group.
071     */
072    public BundleEngine() {
073    }
074
075    /**
076     * Create a Bundle engine to perform operations on behave of a user.
077     *
078     * @param user user name.
079     */
080    public BundleEngine(String user) {
081        this.user = ParamChecker.notEmpty(user, "user");
082    }
083
084    /* (non-Javadoc)
085     * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
086     */
087    @Override
088    public void change(String jobId, String changeValue) throws BundleEngineException {
089        try {
090            BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue);
091            change.call();
092        }
093        catch (CommandException ex) {
094            throw new BundleEngineException(ex);
095        }
096    }
097
098    /* (non-Javadoc)
099     * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
100     */
101    @Override
102    public String dryRunSubmit(Configuration conf) throws BundleEngineException {
103        BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf);
104        try {
105            String jobId = submit.call();
106            return jobId;
107        }
108        catch (CommandException ex) {
109            throw new BundleEngineException(ex);
110        }
111    }
112
113    /* (non-Javadoc)
114     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
115     */
116    @Override
117    public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException {
118        throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine"));
119    }
120
121    public BundleJobBean getBundleJob(String jobId) throws BundleEngineException {
122        try {
123            return new BundleJobXCommand(jobId).call();
124        }
125        catch (CommandException ex) {
126            throw new BundleEngineException(ex);
127        }
128    }
129
130    /* (non-Javadoc)
131     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
132     */
133    @Override
134    public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
135            throws BundleEngineException {
136        throw new BundleEngineException(new XException(ErrorCode.E0301,
137                "cannot get a coordinator job from BundleEngine"));
138    }
139
140    /* (non-Javadoc)
141     * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
142     */
143    @Override
144    public String getDefinition(String jobId) throws BundleEngineException {
145        BundleJobBean job;
146        try {
147            job = new BundleJobXCommand(jobId).call();
148        }
149        catch (CommandException ex) {
150            throw new BundleEngineException(ex);
151        }
152        return job.getOrigJobXml();
153    }
154
155    /* (non-Javadoc)
156     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
157     */
158    @Override
159    public WorkflowJob getJob(String jobId) throws BundleEngineException {
160        throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
161    }
162
163    /* (non-Javadoc)
164     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
165     */
166    @Override
167    public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException {
168        throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
169    }
170
171    /* (non-Javadoc)
172     * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
173     */
174    @Override
175    public String getJobIdForExternalId(String externalId) throws BundleEngineException {
176        return null;
177    }
178
179    /* (non-Javadoc)
180     * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
181     */
182    @Override
183    public void kill(String jobId) throws BundleEngineException {
184        try {
185            new BundleKillXCommand(jobId).call();
186        }
187        catch (CommandException e) {
188            throw new BundleEngineException(e);
189        }
190    }
191
192    /* (non-Javadoc)
193     * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration)
194     */
195    @Override
196    @Deprecated
197    public void reRun(String jobId, Configuration conf) throws BundleEngineException {
198        throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun"));
199    }
200
201    /**
202     * Rerun Bundle actions for given rerunType
203     *
204     * @param jobId bundle job id
205     * @param coordScope the rerun scope for coordinator job names separated by ","
206     * @param dateScope the rerun scope for coordinator nominal times separated by ","
207     * @param refresh true if user wants to refresh input/outpur dataset urls
208     * @param noCleanup false if user wants to cleanup output events for given rerun actions
209     * @throws BaseEngineException thrown if failed to rerun
210     */
211    public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
212            throws BaseEngineException {
213        try {
214            new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call();
215        }
216        catch (CommandException ex) {
217            throw new BaseEngineException(ex);
218        }
219    }
220
221    /* (non-Javadoc)
222     * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
223     */
224    @Override
225    public void resume(String jobId) throws BundleEngineException {
226        BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId);
227        try {
228            resume.call();
229        }
230        catch (CommandException ex) {
231            throw new BundleEngineException(ex);
232        }
233    }
234
235    /* (non-Javadoc)
236     * @see org.apache.oozie.BaseEngine#start(java.lang.String)
237     */
238    @Override
239    public void start(String jobId) throws BundleEngineException {
240        try {
241            new BundleStartXCommand(jobId).call();
242        }
243        catch (CommandException e) {
244            throw new BundleEngineException(e);
245        }
246    }
247
248    @Override
249    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
250            BundleEngineException {
251        streamJobLog(jobId, writer, params, LOG_TYPE.LOG);
252    }
253
254    @Override
255    public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
256            BundleEngineException {
257        streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG);
258    }
259
260    @Override
261    public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
262            BundleEngineException {
263        try {
264            streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)), jobId, writer, params, LOG_TYPE.AUDIT_LOG);
265        }
266        catch (CommandException e) {
267            throw new IOException(e);
268        }
269    }
270
271    private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
272            throws IOException, BundleEngineException {
273        try {
274            streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType);
275        }
276        catch (Exception e) {
277            throw new IOException(e);
278        }
279    }
280
281    private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
282            throws IOException, BundleEngineException {
283        try {
284            BundleJobBean job;
285            filter.setParameter(DagXLogInfoService.JOB, jobId);
286            job = new BundleJobXCommand(jobId).call();
287            Date lastTime = null;
288            if (job.isTerminalStatus()) {
289                lastTime = job.getLastModifiedTime();
290            }
291            if (lastTime == null) {
292                lastTime = new Date();
293            }
294            fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
295        }
296        catch (Exception ex) {
297            throw new IOException(ex);
298        }
299    }
300
301    /* (non-Javadoc)
302     * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)
303     */
304    @Override
305    public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException {
306        try {
307            String jobId = new BundleSubmitXCommand(conf).call();
308
309            if (startJob) {
310                start(jobId);
311            }
312            return jobId;
313        }
314        catch (CommandException ex) {
315            throw new BundleEngineException(ex);
316        }
317    }
318
319    /* (non-Javadoc)
320     * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
321     */
322    @Override
323    public void suspend(String jobId) throws BundleEngineException {
324        BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId);
325        try {
326            suspend.call();
327        }
328        catch (CommandException ex) {
329            throw new BundleEngineException(ex);
330        }
331    }
332
333    /**
334     * Get bundle jobs
335     *
336     * @param filter the filter string
337     * @param start start location for paging
338     * @param len total length to get
339     * @return bundle job info
340     * @throws BundleEngineException thrown if failed to get bundle job info
341     */
342    public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException {
343        Map<String, List<String>> filterList = parseFilter(filter);
344
345        try {
346            return new BundleJobsXCommand(filterList, start, len).call();
347        }
348        catch (CommandException ex) {
349            throw new BundleEngineException(ex);
350        }
351    }
352
353    /**
354     * Parse filter string to a map with key = filter name and values = filter values
355     *
356     * @param filter the filter string
357     * @return filter key and value map
358     * @throws CoordinatorEngineException thrown if failed to parse filter string
359     */
360    @VisibleForTesting
361    Map<String, List<String>> parseFilter(String filter) throws BundleEngineException {
362        try {
363            return JobsFilterUtils.parseFilter(filter);
364        }
365        catch (ServletException ex) {
366            throw new BundleEngineException(ErrorCode.E0420, filter, ex.getMessage());
367        }
368    }
369
370    /**
371     * Get bulk job response
372     *
373     * @param bulkFilter the filter string
374     * @param start start location for paging
375     * @param len total length to get
376     * @return bulk job info
377     * @throws BundleEngineException thrown if failed to get bulk job info
378     */
379    public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException {
380        Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter);
381        try {
382            return new BulkJobsXCommand(bulkRequestMap, start, len).call();
383        }
384        catch (CommandException ex) {
385            throw new BundleEngineException(ex);
386        }
387    }
388
389    /**
390     * Parse filter string to a map with key = filter name and values = filter values
391     * Allowed keys are defined as constants on top
392     *
393     * @param bulkParams the filter string
394     * @return filter key-value pair map
395     * @throws BundleEngineException thrown if failed to parse filter string
396     */
397    public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException {
398
399        Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>();
400        // Functionality can be extended to different job levels - TODO extend filter parser and query
401        // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL
402        if (bulkFilter != null) {
403            StringTokenizer st = new StringTokenizer(bulkParams, ";");
404            while (st.hasMoreTokens()) {
405                String token = st.nextToken();
406                if (token.contains("=")) {
407                    String[] pair = token.split("=");
408                    if (pair.length != 2) {
409                        throw new BundleEngineException(ErrorCode.E0420, token,
410                                "elements must be semicolon-separated name=value pairs");
411                    }
412                    pair[0] = pair[0].toLowerCase();
413                    String[] values = pair[1].split(",");
414                    if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) {
415                        throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]",
416                                pair[0]));
417                    }
418                    // special check and processing for time related params
419                    if (pair[0].contains("time")) {
420                        try {
421                            DateUtils.parseDateUTC(pair[1]);
422                        }
423                        catch (ParseException e) {
424                            throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
425                                    "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1],
426                                    DateUtils.ISO8601_UTC_MASK));
427                        }
428                    }
429                    // special check for action status param
430                    // TODO: when extended for levels other than coord action, check against corresponding level's Status values
431                    if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) {
432                        for(String value : values) {
433                            try {
434                                CoordinatorAction.Status.valueOf(value);
435                            }
436                            catch (IllegalArgumentException ex) {
437                                throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
438                                        "invalid action status [{0}]", value));
439                            }
440                        }
441                    }
442                    // eventually adding into map for all cases e.g. names, times, status
443                    List<String> list = bulkFilter.get(pair[0]);
444                    if (list == null) {
445                        list = new ArrayList<String>();
446                        bulkFilter.put(pair[0], list);
447                    }
448                    for(String value : values) {
449                        value = value.trim();
450                        if(value.isEmpty()) {
451                            throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace");
452                        }
453                        list.add(value);
454                    }
455                } else {
456                    throw new BundleEngineException(ErrorCode.E0420, token,
457                            "elements must be semicolon-separated name=value pairs");
458                }
459            }
460            if (!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE)) {
461                throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE);
462            }
463        }
464        return bulkFilter;
465    }
466
467    /**
468     * Return the status for a Job ID
469     *
470     * @param jobId job Id.
471     * @return the job's status
472     * @throws BundleEngineException thrown if the job's status could not be obtained
473     */
474    @Override
475    public String getJobStatus(String jobId) throws BundleEngineException {
476        try {
477            BundleJobBean bundleJob = BundleJobQueryExecutor.getInstance().get(
478                    BundleJobQueryExecutor.BundleJobQuery.GET_BUNDLE_JOB_STATUS, jobId);
479            return bundleJob.getStatusStr();
480        }
481        catch (JPAExecutorException e) {
482            throw new BundleEngineException(e);
483        }
484    }
485
486    @Override
487    public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
488        try {
489            new BundleSLAAlertsEnableXCommand(id, actions, dates, childIds).call();
490        }
491        catch (CommandException e) {
492            throw new BundleEngineException(e);
493        }
494    }
495
496    @Override
497    public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
498        try {
499            new BundleSLAAlertsDisableXCommand(id, actions, dates, childIds).call();
500        }
501        catch (CommandException e) {
502            throw new BundleEngineException(e);
503        }
504    }
505
506    @Override
507    public void changeSLA(String id, String actions, String dates, String childIds, String newParams)
508            throws BaseEngineException {
509        Map<String, String> slaNewParams = null;
510        try {
511
512            if (newParams != null) {
513                slaNewParams = JobUtils.parseChangeValue(newParams);
514            }
515            new BundleSLAChangeXCommand(id, actions, dates, childIds, slaNewParams).call();
516        }
517        catch (CommandException e) {
518            throw new BundleEngineException(e);
519        }
520    }
521
522    /**
523     * return a list of killed Bundle job
524     *
525     * @param filter, the filter string for which the bundle jobs are killed
526     * @param start, the starting index for bundle jobs
527     * @param len, maximum number of jobs to be killed
528     * @return the list of jobs being killed
529     * @throws BundleEngineException thrown if one or more of the jobs cannot be killed
530     */
531    public BundleJobInfo killJobs(String filter, int start, int len) throws BundleEngineException {
532        try {
533            Map<String, List<String>> filterList = parseFilter(filter);
534            BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Kill).call();
535            if (bundleJobInfo == null) {
536                return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0);
537            }
538            return bundleJobInfo;
539        }
540        catch (CommandException ex) {
541            throw new BundleEngineException(ex);
542        }
543    }
544
545    /**
546     * return a list of suspended Bundle job
547     *
548     * @param filter, the filter string for which the bundle jobs are suspended
549     * @param start, the starting index for bundle jobs
550     * @param len, maximum number of jobs to be suspended
551     * @return the list of jobs being suspended
552     * @throws BundleEngineException thrown if one or more of the jobs cannot be suspended
553     */
554    public BundleJobInfo suspendJobs(String filter, int start, int len) throws BundleEngineException {
555        try {
556            Map<String, List<String>> filterList = parseFilter(filter);
557            BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Suspend).call();
558            if (bundleJobInfo == null) {
559                return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0);
560            }
561            return bundleJobInfo;
562        }
563        catch (CommandException ex) {
564            throw new BundleEngineException(ex);
565        }
566    }
567
568    /**
569     * return a list of resumed Bundle job
570     *
571     * @param filter, the filter string for which the bundle jobs are resumed
572     * @param start, the starting index for bundle jobs
573     * @param len, maximum number of jobs to be resumed
574     * @return the list of jobs being resumed
575     * @throws BundleEngineException thrown if one or more of the jobs cannot be resumed
576     */
577    public BundleJobInfo resumeJobs(String filter, int start, int len) throws BundleEngineException {
578        try {
579            Map<String, List<String>> filterList = parseFilter(filter);
580            BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Resume).call();
581            if (bundleJobInfo == null) {
582                return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0);
583            }
584            return bundleJobInfo;
585        }
586        catch (CommandException ex) {
587            throw new BundleEngineException(ex);
588        }
589    }
590}