001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie;
020
021import java.io.IOException;
022import java.io.Writer;
023import java.text.ParseException;
024import java.util.ArrayList;
025import java.util.Date;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.StringTokenizer;
030
031import javax.servlet.ServletException;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.oozie.client.CoordinatorAction;
035import org.apache.oozie.client.CoordinatorJob;
036import org.apache.oozie.client.WorkflowJob;
037import org.apache.oozie.client.rest.BulkResponseImpl;
038import org.apache.oozie.command.BulkJobsXCommand;
039import org.apache.oozie.command.CommandException;
040import org.apache.oozie.command.OperationType;
041import org.apache.oozie.command.bundle.BulkBundleXCommand;
042import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
043import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
044import org.apache.oozie.command.bundle.BundleJobSuspendXCommand;
045import org.apache.oozie.command.bundle.BundleJobXCommand;
046import org.apache.oozie.command.bundle.BundleJobsXCommand;
047import org.apache.oozie.command.bundle.BundleKillXCommand;
048import org.apache.oozie.command.bundle.BundleRerunXCommand;
049import org.apache.oozie.command.bundle.BundleSLAAlertsDisableXCommand;
050import org.apache.oozie.command.bundle.BundleSLAAlertsEnableXCommand;
051import org.apache.oozie.command.bundle.BundleSLAChangeXCommand;
052import org.apache.oozie.command.bundle.BundleStartXCommand;
053import org.apache.oozie.command.bundle.BundleSubmitXCommand;
054import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
055import org.apache.oozie.executor.jpa.JPAExecutorException;
056import org.apache.oozie.service.DagXLogInfoService;
057import org.apache.oozie.service.Services;
058import org.apache.oozie.service.XLogStreamingService;
059import org.apache.oozie.util.DateUtils;
060import org.apache.oozie.util.JobUtils;
061import org.apache.oozie.util.JobsFilterUtils;
062import org.apache.oozie.util.ParamChecker;
063import org.apache.oozie.util.XLog;
064import org.apache.oozie.util.XLogStreamer;
065
066import com.google.common.annotations.VisibleForTesting;
067
068public class BundleEngine extends BaseEngine {
069    /**
070     * Create a system Bundle engine, with no user and no group.
071     */
072    public BundleEngine() {
073    }
074
075    /**
076     * Create a Bundle engine to perform operations on behave of a user.
077     *
078     * @param user user name.
079     */
080    public BundleEngine(String user) {
081        this.user = ParamChecker.notEmpty(user, "user");
082    }
083
084    /* (non-Javadoc)
085     * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
086     */
087    @Override
088    public void change(String jobId, String changeValue) throws BundleEngineException {
089        try {
090            BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue);
091            change.call();
092        }
093        catch (CommandException ex) {
094            throw new BundleEngineException(ex);
095        }
096    }
097
098    /* (non-Javadoc)
099     * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
100     */
101    @Override
102    public String dryRunSubmit(Configuration conf) throws BundleEngineException {
103        BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf);
104        try {
105            String jobId = submit.call();
106            return jobId;
107        }
108        catch (CommandException ex) {
109            throw new BundleEngineException(ex);
110        }
111    }
112
113    /* (non-Javadoc)
114     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
115     */
116    @Override
117    public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException {
118        throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine"));
119    }
120
121    public BundleJobBean getBundleJob(String jobId) throws BundleEngineException {
122        try {
123            return new BundleJobXCommand(jobId).call();
124        }
125        catch (CommandException ex) {
126            throw new BundleEngineException(ex);
127        }
128    }
129
130    /* (non-Javadoc)
131     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
132     */
133    @Override
134    public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
135            throws BundleEngineException {
136        throw new BundleEngineException(new XException(ErrorCode.E0301,
137                "cannot get a coordinator job from BundleEngine"));
138    }
139
140    /* (non-Javadoc)
141     * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
142     */
143    @Override
144    public String getDefinition(String jobId) throws BundleEngineException {
145        BundleJobBean job;
146        try {
147            job = new BundleJobXCommand(jobId).call();
148        }
149        catch (CommandException ex) {
150            throw new BundleEngineException(ex);
151        }
152        return job.getOrigJobXml();
153    }
154
155    /* (non-Javadoc)
156     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
157     */
158    @Override
159    public WorkflowJob getJob(String jobId) throws BundleEngineException {
160        throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
161    }
162
163    /* (non-Javadoc)
164     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
165     */
166    @Override
167    public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException {
168        throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
169    }
170
171    /* (non-Javadoc)
172     * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
173     */
174    @Override
175    public String getJobIdForExternalId(String externalId) throws BundleEngineException {
176        return null;
177    }
178
179    /* (non-Javadoc)
180     * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
181     */
182    @Override
183    public void kill(String jobId) throws BundleEngineException {
184        try {
185            new BundleKillXCommand(jobId).call();
186        }
187        catch (CommandException e) {
188            throw new BundleEngineException(e);
189        }
190    }
191
192    /* (non-Javadoc)
193     * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration)
194     */
195    @Override
196    @Deprecated
197    public void reRun(String jobId, Configuration conf) throws BundleEngineException {
198        throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun"));
199    }
200
201    /**
202     * Rerun Bundle actions for given rerunType
203     *
204     * @param jobId bundle job id
205     * @param coordScope the rerun scope for coordinator job names separated by ","
206     * @param dateScope the rerun scope for coordinator nominal times separated by ","
207     * @param refresh true if user wants to refresh input/outpur dataset urls
208     * @param noCleanup false if user wants to cleanup output events for given rerun actions
209     * @throws BaseEngineException thrown if failed to rerun
210     */
211    public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
212            throws BaseEngineException {
213        try {
214            new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call();
215        }
216        catch (CommandException ex) {
217            throw new BaseEngineException(ex);
218        }
219    }
220
221    /* (non-Javadoc)
222     * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
223     */
224    @Override
225    public void resume(String jobId) throws BundleEngineException {
226        BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId);
227        try {
228            resume.call();
229        }
230        catch (CommandException ex) {
231            throw new BundleEngineException(ex);
232        }
233    }
234
235    /* (non-Javadoc)
236     * @see org.apache.oozie.BaseEngine#start(java.lang.String)
237     */
238    @Override
239    public void start(String jobId) throws BundleEngineException {
240        try {
241            new BundleStartXCommand(jobId).call();
242        }
243        catch (CommandException e) {
244            throw new BundleEngineException(e);
245        }
246    }
247
248    protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException,
249            BundleEngineException {
250    try {
251            BundleJobBean job;
252            logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId);
253            job = new BundleJobXCommand(jobId).call();
254            Date lastTime = null;
255            if (job.isTerminalStatus()) {
256                lastTime = job.getLastModifiedTime();
257            }
258            if (lastTime == null) {
259                lastTime = new Date();
260            }
261            Services.get().get(XLogStreamingService.class)
262                    .streamLog(logStreamer, job.getCreatedTime(), lastTime, writer);
263        }
264        catch (CommandException ex) {
265            throw new IOException(ex);
266        }
267    }
268
269    /* (non-Javadoc)
270     * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)
271     */
272    @Override
273    public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException {
274        try {
275            String jobId = new BundleSubmitXCommand(conf).call();
276
277            if (startJob) {
278                start(jobId);
279            }
280            return jobId;
281        }
282        catch (CommandException ex) {
283            throw new BundleEngineException(ex);
284        }
285    }
286
287    /* (non-Javadoc)
288     * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
289     */
290    @Override
291    public void suspend(String jobId) throws BundleEngineException {
292        BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId);
293        try {
294            suspend.call();
295        }
296        catch (CommandException ex) {
297            throw new BundleEngineException(ex);
298        }
299    }
300
301    /**
302     * Get bundle jobs
303     *
304     * @param filter the filter string
305     * @param start start location for paging
306     * @param len total length to get
307     * @return bundle job info
308     * @throws BundleEngineException thrown if failed to get bundle job info
309     */
310    public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException {
311        Map<String, List<String>> filterList = parseFilter(filter);
312
313        try {
314            return new BundleJobsXCommand(filterList, start, len).call();
315        }
316        catch (CommandException ex) {
317            throw new BundleEngineException(ex);
318        }
319    }
320
321    /**
322     * Parse filter string to a map with key = filter name and values = filter values
323     *
324     * @param filter the filter string
325     * @return filter key and value map
326     * @throws CoordinatorEngineException thrown if failed to parse filter string
327     */
328    @VisibleForTesting
329    Map<String, List<String>> parseFilter(String filter) throws BundleEngineException {
330        try {
331            return JobsFilterUtils.parseFilter(filter);
332        }
333        catch (ServletException ex) {
334            throw new BundleEngineException(ErrorCode.E0420, filter, ex.getMessage());
335        }
336    }
337
338    /**
339     * Get bulk job response
340     *
341     * @param bulkFilter the filter string
342     * @param start start location for paging
343     * @param len total length to get
344     * @return bulk job info
345     * @throws BundleEngineException thrown if failed to get bulk job info
346     */
347    public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException {
348        Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter);
349        try {
350            return new BulkJobsXCommand(bulkRequestMap, start, len).call();
351        }
352        catch (CommandException ex) {
353            throw new BundleEngineException(ex);
354        }
355    }
356
357    /**
358     * Parse filter string to a map with key = filter name and values = filter values
359     * Allowed keys are defined as constants on top
360     *
361     * @param bulkParams the filter string
362     * @return filter key-value pair map
363     * @throws BundleEngineException thrown if failed to parse filter string
364     */
365    public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException {
366
367        Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>();
368        // Functionality can be extended to different job levels - TODO extend filter parser and query
369        // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL
370        if (bulkFilter != null) {
371            StringTokenizer st = new StringTokenizer(bulkParams, ";");
372            while (st.hasMoreTokens()) {
373                String token = st.nextToken();
374                if (token.contains("=")) {
375                    String[] pair = token.split("=");
376                    if (pair.length != 2) {
377                        throw new BundleEngineException(ErrorCode.E0420, token,
378                                "elements must be semicolon-separated name=value pairs");
379                    }
380                    pair[0] = pair[0].toLowerCase();
381                    String[] values = pair[1].split(",");
382                    if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) {
383                        throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]",
384                                pair[0]));
385                    }
386                    // special check and processing for time related params
387                    if (pair[0].contains("time")) {
388                        try {
389                            DateUtils.parseDateUTC(pair[1]);
390                        }
391                        catch (ParseException e) {
392                            throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
393                                    "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1],
394                                    DateUtils.ISO8601_UTC_MASK));
395                        }
396                    }
397                    // special check for action status param
398                    // TODO: when extended for levels other than coord action, check against corresponding level's Status values
399                    if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) {
400                        for(String value : values) {
401                            try {
402                                CoordinatorAction.Status.valueOf(value);
403                            }
404                            catch (IllegalArgumentException ex) {
405                                throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
406                                        "invalid action status [{0}]", value));
407                            }
408                        }
409                    }
410                    // eventually adding into map for all cases e.g. names, times, status
411                    List<String> list = bulkFilter.get(pair[0]);
412                    if (list == null) {
413                        list = new ArrayList<String>();
414                        bulkFilter.put(pair[0], list);
415                    }
416                    for(String value : values) {
417                        value = value.trim();
418                        if(value.isEmpty()) {
419                            throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace");
420                        }
421                        list.add(value);
422                    }
423                } else {
424                    throw new BundleEngineException(ErrorCode.E0420, token,
425                            "elements must be semicolon-separated name=value pairs");
426                }
427            }
428            if (!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE)) {
429                throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE);
430            }
431        }
432        return bulkFilter;
433    }
434
435    /**
436     * Return the status for a Job ID
437     *
438     * @param jobId job Id.
439     * @return the job's status
440     * @throws BundleEngineException thrown if the job's status could not be obtained
441     */
442    @Override
443    public String getJobStatus(String jobId) throws BundleEngineException {
444        try {
445            BundleJobBean bundleJob = BundleJobQueryExecutor.getInstance().get(
446                    BundleJobQueryExecutor.BundleJobQuery.GET_BUNDLE_JOB_STATUS, jobId);
447            return bundleJob.getStatusStr();
448        }
449        catch (JPAExecutorException e) {
450            throw new BundleEngineException(e);
451        }
452    }
453
454    @Override
455    public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
456        try {
457            new BundleSLAAlertsEnableXCommand(id, actions, dates, childIds).call();
458        }
459        catch (CommandException e) {
460            throw new BundleEngineException(e);
461        }
462    }
463
464    @Override
465    public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
466        try {
467            new BundleSLAAlertsDisableXCommand(id, actions, dates, childIds).call();
468        }
469        catch (CommandException e) {
470            throw new BundleEngineException(e);
471        }
472    }
473
474    @Override
475    public void changeSLA(String id, String actions, String dates, String childIds, String newParams)
476            throws BaseEngineException {
477        Map<String, String> slaNewParams = null;
478        try {
479
480            if (newParams != null) {
481                slaNewParams = JobUtils.parseChangeValue(newParams);
482            }
483            new BundleSLAChangeXCommand(id, actions, dates, childIds, slaNewParams).call();
484        }
485        catch (CommandException e) {
486            throw new BundleEngineException(e);
487        }
488    }
489
490    /**
491     * return a list of killed Bundle job
492     *
493     * @param filter the filter string for which the bundle jobs are killed
494     * @param start the starting index for bundle jobs
495     * @param len maximum number of jobs to be killed
496     * @return the list of jobs being killed
497     * @throws BundleEngineException thrown if one or more of the jobs cannot be killed
498     */
499    public BundleJobInfo killJobs(String filter, int start, int len) throws BundleEngineException {
500        try {
501            Map<String, List<String>> filterList = parseFilter(filter);
502            BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Kill).call();
503            if (bundleJobInfo == null) {
504                return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0);
505            }
506            return bundleJobInfo;
507        }
508        catch (CommandException ex) {
509            throw new BundleEngineException(ex);
510        }
511    }
512
513    /**
514     * return a list of suspended Bundle job
515     *
516     * @param filter the filter string for which the bundle jobs are suspended
517     * @param start the starting index for bundle jobs
518     * @param len maximum number of jobs to be suspended
519     * @return the list of jobs being suspended
520     * @throws BundleEngineException thrown if one or more of the jobs cannot be suspended
521     */
522    public BundleJobInfo suspendJobs(String filter, int start, int len) throws BundleEngineException {
523        try {
524            Map<String, List<String>> filterList = parseFilter(filter);
525            BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Suspend).call();
526            if (bundleJobInfo == null) {
527                return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0);
528            }
529            return bundleJobInfo;
530        }
531        catch (CommandException ex) {
532            throw new BundleEngineException(ex);
533        }
534    }
535
536    /**
537     * return a list of resumed Bundle job
538     *
539     * @param filter the filter string for which the bundle jobs are resumed
540     * @param start the starting index for bundle jobs
541     * @param len maximum number of jobs to be resumed
542     * @return the list of jobs being resumed
543     * @throws BundleEngineException thrown if one or more of the jobs cannot be resumed
544     */
545    public BundleJobInfo resumeJobs(String filter, int start, int len) throws BundleEngineException {
546        try {
547            Map<String, List<String>> filterList = parseFilter(filter);
548            BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Resume).call();
549            if (bundleJobInfo == null) {
550                return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0);
551            }
552            return bundleJobInfo;
553        }
554        catch (CommandException ex) {
555            throw new BundleEngineException(ex);
556        }
557    }
558}