001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie;
020
021import java.io.IOException;
022import java.io.Writer;
023import java.sql.Timestamp;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.Comparator;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.LinkedHashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.StringTokenizer;
037
038import org.apache.commons.lang.StringUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.oozie.client.CoordinatorAction;
041import org.apache.oozie.client.CoordinatorJob;
042import org.apache.oozie.client.OozieClient;
043import org.apache.oozie.client.WorkflowJob;
044import org.apache.oozie.client.rest.RestConstants;
045import org.apache.oozie.command.CommandException;
046import org.apache.oozie.command.OperationType;
047import org.apache.oozie.command.coord.BulkCoordXCommand;
048import org.apache.oozie.command.coord.CoordActionInfoXCommand;
049import org.apache.oozie.command.coord.CoordActionsIgnoreXCommand;
050import org.apache.oozie.command.coord.CoordActionsKillXCommand;
051import org.apache.oozie.command.coord.CoordChangeXCommand;
052import org.apache.oozie.command.coord.CoordJobXCommand;
053import org.apache.oozie.command.coord.CoordJobsXCommand;
054import org.apache.oozie.command.coord.CoordKillXCommand;
055import org.apache.oozie.command.coord.CoordRerunXCommand;
056import org.apache.oozie.command.coord.CoordResumeXCommand;
057import org.apache.oozie.command.coord.CoordSLAAlertsDisableXCommand;
058import org.apache.oozie.command.coord.CoordSLAAlertsEnableXCommand;
059import org.apache.oozie.command.coord.CoordSLAChangeXCommand;
060import org.apache.oozie.command.coord.CoordSubmitXCommand;
061import org.apache.oozie.command.coord.CoordSuspendXCommand;
062import org.apache.oozie.command.coord.CoordUpdateXCommand;
063import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
064import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
065import org.apache.oozie.executor.jpa.JPAExecutorException;
066import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
067import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
068import org.apache.oozie.service.DagXLogInfoService;
069import org.apache.oozie.service.Services;
070import org.apache.oozie.service.XLogStreamingService;
071import org.apache.oozie.util.CoordActionsInDateRange;
072import org.apache.oozie.util.DateUtils;
073import org.apache.oozie.util.JobUtils;
074import org.apache.oozie.util.Pair;
075import org.apache.oozie.util.ParamChecker;
076import org.apache.oozie.util.XLog;
077import org.apache.oozie.util.XLogAuditFilter;
078import org.apache.oozie.util.XLogFilter;
079import org.apache.oozie.util.XLogUserFilterParam;
080
081import com.google.common.annotations.VisibleForTesting;
082
083public class CoordinatorEngine extends BaseEngine {
084    private static final XLog LOG = XLog.getLog(CoordinatorEngine.class);
085    public final static String COORD_ACTIONS_LOG_MAX_COUNT = "oozie.coord.actions.log.max.count";
086    private final static int COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT = 50;
087    private final int maxNumActionsForLog;
088
089    public enum FILTER_COMPARATORS {
090        //This ordering is important, dont change this
091        GREATER_EQUAL(">="), GREATER(">"), LESSTHAN_EQUAL("<="), LESSTHAN("<"), NOT_EQUALS("!="), EQUALS("=");
092
093        private final String sign;
094
095        FILTER_COMPARATORS(String sign) {
096            this.sign = sign;
097        }
098
099        public String getSign() {
100            return sign;
101        }
102    }
103
104    public static final String[] VALID_JOB_FILTERS = {OozieClient.FILTER_STATUS, OozieClient.FILTER_NOMINAL_TIME};
105
106    /**
107     * Create a system Coordinator engine, with no user and no group.
108     */
109    public CoordinatorEngine() {
110        maxNumActionsForLog = Services.get().getConf()
111                .getInt(COORD_ACTIONS_LOG_MAX_COUNT, COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT);
112    }
113
114    /**
115     * Create a Coordinator engine to perform operations on behave of a user.
116     *
117     * @param user user name.
118     */
119    public CoordinatorEngine(String user) {
120        this();
121        this.user = ParamChecker.notEmpty(user, "user");
122    }
123
124    /*
125     * (non-Javadoc)
126     *
127     * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
128     */
129    @Override
130    public String getDefinition(String jobId) throws BaseEngineException {
131        CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
132        return job.getOrigJobXml();
133    }
134
135    /**
136     * @param jobId
137     * @return CoordinatorJobBean
138     * @throws BaseEngineException
139     */
140    private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException {
141        try {
142            return new CoordJobXCommand(jobId).call();
143        }
144        catch (CommandException ex) {
145            throw new BaseEngineException(ex);
146        }
147    }
148
149    /**
150     * @param actionId
151     * @return CoordinatorActionBean
152     * @throws BaseEngineException
153     */
154    public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException {
155        try {
156            return new CoordActionInfoXCommand(actionId).call();
157        }
158        catch (CommandException ex) {
159            throw new BaseEngineException(ex);
160        }
161    }
162
163    /*
164     * (non-Javadoc)
165     *
166     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
167     */
168    @Override
169    public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException {
170        try {
171            return new CoordJobXCommand(jobId).call();
172        }
173        catch (CommandException ex) {
174            throw new BaseEngineException(ex);
175        }
176    }
177
178    /*
179     * (non-Javadoc)
180     *
181     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int)
182     */
183    @Override
184    public CoordinatorJobBean getCoordJob(String jobId, String filter, int offset, int length, boolean desc)
185            throws BaseEngineException {
186        Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = parseJobFilter(filter);
187        try {
188            return new CoordJobXCommand(jobId, filterMap, offset, length, desc).call();
189        }
190        catch (CommandException ex) {
191            throw new BaseEngineException(ex);
192        }
193    }
194
195    /*
196     * (non-Javadoc)
197     *
198     * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
199     */
200    @Override
201    public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException {
202        return null;
203    }
204
205    /*
206     * (non-Javadoc)
207     *
208     * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
209     */
210    @Override
211    public void kill(String jobId) throws CoordinatorEngineException {
212        try {
213            new CoordKillXCommand(jobId).call();
214            LOG.info("User " + user + " killed the Coordinator job " + jobId);
215        }
216        catch (CommandException e) {
217            throw new CoordinatorEngineException(e);
218        }
219    }
220
221    public CoordinatorActionInfo killActions(String jobId, String rangeType, String scope) throws CoordinatorEngineException {
222        try {
223            return new CoordActionsKillXCommand(jobId, rangeType, scope).call();
224        }
225        catch (CommandException e) {
226            throw new CoordinatorEngineException(e);
227        }
228    }
229
230    /* (non-Javadoc)
231     * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
232     */
233    @Override
234    public void change(String jobId, String changeValue) throws CoordinatorEngineException {
235        try {
236            new CoordChangeXCommand(jobId, changeValue).call();
237            LOG.info("User " + user + " changed the Coordinator job [" + jobId + "] to " + changeValue);
238        }
239        catch (CommandException e) {
240            throw new CoordinatorEngineException(e);
241        }
242    }
243
244    public CoordinatorActionInfo ignore(String jobId, String type, String scope) throws CoordinatorEngineException {
245        try {
246            LOG.info("User " + user + " ignore a Coordinator Action (s) [" + scope + "] of the Coordinator Job ["
247                    + jobId + "]");
248            return new CoordActionsIgnoreXCommand(jobId, type, scope).call();
249        }
250        catch (CommandException e) {
251            throw new CoordinatorEngineException(e);
252        }
253    }
254
255    @Override
256    @Deprecated
257    public void reRun(String jobId, Configuration conf) throws BaseEngineException {
258        throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun"));
259    }
260
261    /**
262     * Rerun coordinator actions for given rerunType
263     *
264     * @param jobId
265     * @param rerunType
266     * @param scope
267     * @param refresh
268     * @param noCleanup
269     * @throws BaseEngineException
270     */
271    public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup,
272                                       boolean failed, Configuration conf)
273            throws BaseEngineException {
274        try {
275            return new CoordRerunXCommand(jobId, rerunType, scope, refresh,
276                    noCleanup, failed, conf).call();
277        }
278        catch (CommandException ex) {
279            throw new BaseEngineException(ex);
280        }
281    }
282
283    /*
284     * (non-Javadoc)
285     *
286     * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
287     */
288    @Override
289    public void resume(String jobId) throws CoordinatorEngineException {
290        try {
291            new CoordResumeXCommand(jobId).call();
292        }
293        catch (CommandException e) {
294            throw new CoordinatorEngineException(e);
295        }
296    }
297
298    @Override
299    @Deprecated
300    public void start(String jobId) throws BaseEngineException {
301        throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start"));
302    }
303
304    @Override
305    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
306            BaseEngineException {
307        streamJobLog(jobId, writer, params, LOG_TYPE.LOG);
308    }
309
310    @Override
311    public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
312            BaseEngineException {
313        streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG);
314    }
315
316    @Override
317    public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
318    BaseEngineException {
319        try {
320            streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)), jobId, writer, params, LOG_TYPE.AUDIT_LOG);
321        }
322        catch (CommandException e) {
323            throw new IOException(e);
324        }
325    }
326
327    private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
328            throws IOException, BaseEngineException {
329        try {
330            streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType);
331        }
332        catch (Exception e) {
333            throw new IOException(e);
334        }
335    }
336
337    private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
338            throws IOException, BaseEngineException {
339        try {
340            filter.setParameter(DagXLogInfoService.JOB, jobId);
341            Date lastTime = null;
342            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
343            if (job.isTerminalStatus()) {
344                lastTime = job.getLastModifiedTime();
345            }
346            if (lastTime == null) {
347                lastTime = new Date();
348            }
349            fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
350        }
351        catch (Exception e) {
352            throw new IOException(e);
353        }
354    }
355
356    /**
357     * Add list of actions to the filter based on conditions
358     *
359     * @param jobId Job Id
360     * @param logRetrievalScope Value for the retrieval type
361     * @param logRetrievalType Based on which filter criteria the log is retrieved
362     * @param writer writer to stream the log to
363     * @param params additional parameters from the request
364     * @throws IOException
365     * @throws BaseEngineException
366     * @throws CommandException
367     */
368    public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer,
369            Map<String, String[]> params) throws IOException, BaseEngineException, CommandException {
370
371        Date startTime = null;
372        Date endTime = null;
373        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params));
374
375        filter.setParameter(DagXLogInfoService.JOB, jobId);
376        if (logRetrievalScope != null && logRetrievalType != null) {
377            // if coordinator action logs are to be retrieved based on action id range
378            if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) {
379                // Use set implementation that maintains order or elements to achieve reproducibility:
380                Set<String> actionSet = new LinkedHashSet<String>();
381                String[] list = logRetrievalScope.split(",");
382                for (String s : list) {
383                    s = s.trim();
384                    if (s.contains("-")) {
385                        String[] range = s.split("-");
386                        if (range.length != 2) {
387                            throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
388                                    + "'");
389                        }
390                        int start;
391                        int end;
392                        try {
393                            start = Integer.parseInt(range[0].trim());
394                        } catch (NumberFormatException ne) {
395                            throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer",
396                                    ne);
397                        }
398                        try {
399                            end = Integer.parseInt(range[1].trim());
400                        } catch (NumberFormatException ne) {
401                            throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer",
402                                    ne);
403                        }
404                        if (start > end) {
405                            throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
406                        }
407                        for (int i = start; i <= end; i++) {
408                            actionSet.add(jobId + "@" + i);
409                        }
410                    }
411                    else {
412                        try {
413                            Integer.parseInt(s);
414                        }
415                        catch (NumberFormatException ne) {
416                            throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
417                                    + "'. Integer only.");
418                        }
419                        actionSet.add(jobId + "@" + s);
420                    }
421                }
422
423                if (actionSet.size() >= maxNumActionsForLog) {
424                    throw new CommandException(ErrorCode.E0302,
425                            "Retrieving log of too many coordinator actions. Max count is "
426                                    + maxNumActionsForLog + " actions");
427                }
428                Iterator<String> actionsIterator = actionSet.iterator();
429                StringBuilder orSeparatedActions = new StringBuilder("");
430                boolean orRequired = false;
431                while (actionsIterator.hasNext()) {
432                    if (orRequired) {
433                        orSeparatedActions.append("|");
434                    }
435                    orSeparatedActions.append(actionsIterator.next().toString());
436                    orRequired = true;
437                }
438                if (actionSet.size() > 1 && orRequired) {
439                    orSeparatedActions.insert(0, "(");
440                    orSeparatedActions.append(")");
441                }
442
443                filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
444                if (actionSet != null && actionSet.size() == 1) {
445                    CoordinatorActionBean actionBean = getCoordAction(actionSet.iterator().next());
446                    startTime = actionBean.getCreatedTime();
447                    endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean
448                            .getLastModifiedTime();
449                    filter.setActionList(true);
450                }
451                else if (actionSet != null && actionSet.size() > 0) {
452                    List<String> tempList = new ArrayList<String>(actionSet);
453                    Collections.sort(tempList, new Comparator<String>() {
454                        public int compare(String a, String b) {
455                            return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo(
456                                    Integer.valueOf(b.substring(b.lastIndexOf("@") + 1)));
457                        }
458                    });
459                    startTime = getCoordAction(tempList.get(0)).getCreatedTime();
460                    endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, tempList.get(0),
461                            tempList.get(tempList.size() - 1));
462                    filter.setActionList(true);
463                }
464            }
465            // if coordinator action logs are to be retrieved based on date range
466            // this block gets the corresponding list of coordinator actions to be used by the log filter
467            if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) {
468                List<String> coordActionIdList = null;
469                try {
470                    coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope);
471                }
472                catch (XException xe) {
473                    throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe);
474                }
475                if(coordActionIdList.size() >= maxNumActionsForLog) {
476                    throw new CommandException(ErrorCode.E0302,
477                            "Retrieving log of too many coordinator actions. Max count is "
478                                    + maxNumActionsForLog + " actions");
479                }
480                StringBuilder orSeparatedActions = new StringBuilder("");
481                boolean orRequired = false;
482                for (String coordActionId : coordActionIdList) {
483                    if (orRequired) {
484                        orSeparatedActions.append("|");
485                    }
486                    orSeparatedActions.append(coordActionId);
487                    orRequired = true;
488                }
489                if (coordActionIdList.size() > 1 && orRequired) {
490                    orSeparatedActions.insert(0, "(");
491                    orSeparatedActions.append(")");
492                }
493                filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
494                if (coordActionIdList != null && coordActionIdList.size() == 1) {
495                    CoordinatorActionBean actionBean = getCoordAction(coordActionIdList.get(0));
496                    startTime = actionBean.getCreatedTime();
497                    endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean
498                            .getLastModifiedTime();
499                    filter.setActionList(true);
500                }
501                else if (coordActionIdList != null && coordActionIdList.size() > 0) {
502                    Collections.sort(coordActionIdList, new Comparator<String>() {
503                        public int compare(String a, String b) {
504                            return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo(
505                                    Integer.valueOf(b.substring(b.lastIndexOf("@") + 1)));
506                        }
507                    });
508                    startTime = getCoordAction(coordActionIdList.get(0)).getCreatedTime();
509                    endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, coordActionIdList.get(0),
510                            coordActionIdList.get(coordActionIdList.size() - 1));
511                    filter.setActionList(true);
512                }
513            }
514        }
515        if (startTime == null || endTime == null) {
516            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
517            if (startTime == null) {
518                startTime = job.getCreatedTime();
519            }
520            if (endTime == null) {
521                if (job.isTerminalStatus()) {
522                    endTime = job.getLastModifiedTime();
523                }
524                if (endTime == null) {
525                    endTime = new Date();
526                }
527            }
528        }
529        Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params);
530    }
531
532    /*
533     * (non-Javadoc)
534     *
535     * @see
536     * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration
537     * , boolean)
538     */
539    @Override
540    public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException {
541        try {
542            CoordSubmitXCommand submit = new CoordSubmitXCommand(conf);
543            return submit.call();
544        }
545        catch (CommandException ex) {
546            throw new CoordinatorEngineException(ex);
547        }
548    }
549
550    /*
551     * (non-Javadoc)
552     *
553     * @see
554     * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
555     */
556    @Override
557    public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException {
558        try {
559            CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf);
560            return submit.call();
561        }
562        catch (CommandException ex) {
563            throw new CoordinatorEngineException(ex);
564        }
565    }
566
567    /*
568     * (non-Javadoc)
569     *
570     * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
571     */
572    @Override
573    public void suspend(String jobId) throws CoordinatorEngineException {
574        try {
575            new CoordSuspendXCommand(jobId).call();
576        }
577        catch (CommandException e) {
578            throw new CoordinatorEngineException(e);
579        }
580
581    }
582
583    /*
584     * (non-Javadoc)
585     *
586     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
587     */
588    @Override
589    public WorkflowJob getJob(String jobId) throws BaseEngineException {
590        throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
591    }
592
593    /*
594     * (non-Javadoc)
595     *
596     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
597     */
598    @Override
599    public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException {
600        throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
601    }
602
603    private static final Set<String> FILTER_NAMES = new HashSet<String>();
604
605    static {
606        FILTER_NAMES.add(OozieClient.FILTER_USER);
607        FILTER_NAMES.add(OozieClient.FILTER_NAME);
608        FILTER_NAMES.add(OozieClient.FILTER_GROUP);
609        FILTER_NAMES.add(OozieClient.FILTER_STATUS);
610        FILTER_NAMES.add(OozieClient.FILTER_ID);
611        FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
612        FILTER_NAMES.add(OozieClient.FILTER_UNIT);
613        FILTER_NAMES.add(OozieClient.FILTER_SORT_BY);
614        FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_START);
615        FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_END);
616    }
617
618    /**
619     * @param filter
620     * @param start
621     * @param len
622     * @return CoordinatorJobInfo
623     * @throws CoordinatorEngineException
624     */
625    public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException {
626        Map<String, List<String>> filterList = parseJobsFilter(filter);
627
628        try {
629            return new CoordJobsXCommand(filterList, start, len).call();
630        }
631        catch (CommandException ex) {
632            throw new CoordinatorEngineException(ex);
633        }
634    }
635
636    // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values
637    public Map<Pair<String, FILTER_COMPARATORS>, List<Object>> parseJobFilter(String filter) throws
638        CoordinatorEngineException {
639        Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = new HashMap<Pair<String,
640            FILTER_COMPARATORS>, List<Object>>();
641        if (filter != null) {
642            //split name value pairs
643            StringTokenizer st = new StringTokenizer(filter, ";");
644            while (st.hasMoreTokens()) {
645                String token = st.nextToken().trim();
646                Pair<String, FILTER_COMPARATORS> pair = null;
647                for (FILTER_COMPARATORS comp : FILTER_COMPARATORS.values()) {
648                    if (token.contains(comp.getSign())) {
649                        int index = token.indexOf(comp.getSign());
650                        String key = token.substring(0, index);
651                        String valueStr = token.substring(index + comp.getSign().length());
652                        Object value;
653
654                        if (key.equalsIgnoreCase(OozieClient.FILTER_STATUS)) {
655                            value = valueStr.toUpperCase();
656                            try {
657                                CoordinatorAction.Status.valueOf((String) value);
658                            } catch (IllegalArgumentException ex) {
659                                // Check for incorrect status value
660                                throw new CoordinatorEngineException(ErrorCode.E0421, filter,
661                                    XLog.format("invalid status value [{0}]." + " Valid status values are: [{1}]",
662                                        valueStr, StringUtils.join(CoordinatorAction.Status.values(), ", ")));
663                            }
664
665                            if (!(comp == FILTER_COMPARATORS.EQUALS || comp == FILTER_COMPARATORS.NOT_EQUALS)) {
666                                throw new CoordinatorEngineException(ErrorCode.E0421, filter,
667                                    XLog.format("invalid comparator [{0}] for status." + " Valid are = and !=",
668                                        comp.getSign()));
669                            }
670
671                            pair = Pair.of(OozieClient.FILTER_STATUS, comp);
672                        } else if (key.equalsIgnoreCase(OozieClient.FILTER_NOMINAL_TIME)) {
673                            try {
674                                value = new Timestamp(DateUtils.parseDateUTC(valueStr).getTime());
675                            } catch (ParseException e) {
676                                throw new CoordinatorEngineException(ErrorCode.E0421, filter,
677                                    XLog.format("invalid nominal time [{0}]." + " Valid format: " +
678                                            "[{1}]", valueStr, DateUtils.ISO8601_UTC_MASK));
679                            }
680                            pair = Pair.of(OozieClient.FILTER_NOMINAL_TIME, comp);
681                        } else {
682                            // Check for incorrect filter option
683                            throw new CoordinatorEngineException(ErrorCode.E0421, filter,
684                                XLog.format("invalid filter [{0}]." + " Valid filters [{1}]", key, StringUtils.join
685                                    (VALID_JOB_FILTERS, ", ")));
686                        }
687                        if (!filterMap.containsKey(pair)) {
688                            filterMap.put(pair, new ArrayList<Object>());
689                        }
690                        filterMap.get(pair).add(value);
691                        break;
692                    }
693                }
694
695                if (pair == null) {
696                    //token doesn't contain comparator
697                    throw new CoordinatorEngineException(ErrorCode.E0421, filter,
698                        "filter should be of format <key><comparator><value> pairs");
699                }
700            }
701        }
702        return filterMap;
703    }
704
705    /**
706     * @param filter
707     * @return Map<String, List<String>>
708     * @throws CoordinatorEngineException
709     */
710    @VisibleForTesting
711    Map<String, List<String>> parseJobsFilter(String filter) throws CoordinatorEngineException {
712        Map<String, List<String>> map = new HashMap<String, List<String>>();
713        boolean isTimeUnitSpecified = false;
714        String timeUnit = "MINUTE";
715        boolean isFrequencySpecified = false;
716        String frequency = "";
717        if (filter != null) {
718            StringTokenizer st = new StringTokenizer(filter, ";");
719            while (st.hasMoreTokens()) {
720                String token = st.nextToken();
721                if (token.contains("=")) {
722                    String[] pair = token.split("=");
723                    if (pair.length != 2) {
724                        throw new CoordinatorEngineException(ErrorCode.E0420, filter,
725                                "elements must be semicolon-separated name=value pairs");
726                    }
727                    pair[0] = pair[0].toLowerCase();
728                    if (!FILTER_NAMES.contains(pair[0])) {
729                        throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
730                                pair[0]));
731                    }
732                    if (pair[0].equalsIgnoreCase("frequency")) {
733                        isFrequencySpecified = true;
734                        try {
735                            frequency = (int) Float.parseFloat(pair[1]) + "";
736                            continue;
737                        }
738                        catch (NumberFormatException NANException) {
739                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
740                                    "invalid value [{0}] for frequency. A numerical value is expected", pair[1]));
741                        }
742                    }
743                    if (pair[0].equalsIgnoreCase("unit")) {
744                        isTimeUnitSpecified = true;
745                        timeUnit = pair[1];
746                        if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days")
747                                && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) {
748                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
749                                    "invalid value [{0}] for time unit. "
750                                            + "Valid value is one of months, days, hours or minutes", pair[1]));
751                        }
752                        continue;
753                    }
754                    if (pair[0].equals("status")) {
755                        try {
756                            CoordinatorJob.Status.valueOf(pair[1]);
757                        }
758                        catch (IllegalArgumentException ex) {
759                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
760                                    "invalid status [{0}]", pair[1]));
761                        }
762                    }
763                    List<String> list = map.get(pair[0]);
764                    if (list == null) {
765                        list = new ArrayList<String>();
766                        map.put(pair[0], list);
767                    }
768                    list.add(pair[1]);
769                } else {
770                    throw new CoordinatorEngineException(ErrorCode.E0420, filter,
771                            "elements must be semicolon-separated name=value pairs");
772                }
773            }
774            // Unit is specified and frequency is not specified
775            if (!isFrequencySpecified && isTimeUnitSpecified) {
776                throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when "
777                        + "frequency is specified. Either specify frequency also or else remove the time unit");
778            } else if (isFrequencySpecified) {
779                // Frequency value is specified
780                if (isTimeUnitSpecified) {
781                    if (timeUnit.equalsIgnoreCase("months")) {
782                        timeUnit = "MONTH";
783                    } else if (timeUnit.equalsIgnoreCase("days")) {
784                        timeUnit = "DAY";
785                    } else if (timeUnit.equalsIgnoreCase("hours")) {
786                        // When job details are persisted to database, frequency in hours are converted to minutes.
787                        // This conversion is to conform with that.
788                        frequency = Integer.parseInt(frequency) * 60 + "";
789                        timeUnit = "MINUTE";
790                    } else if (timeUnit.equalsIgnoreCase("minutes")) {
791                        timeUnit = "MINUTE";
792                    }
793                }
794                // Adding the frequency and time unit filters to the filter map
795                List<String> list = new ArrayList<String>();
796                list.add(timeUnit);
797                map.put("unit", list);
798                list = new ArrayList<String>();
799                list.add(frequency);
800                map.put("frequency", list);
801            }
802        }
803        return map;
804    }
805
806    public List<WorkflowJobBean> getReruns(String coordActionId) throws CoordinatorEngineException {
807        List<WorkflowJobBean> wfBeans;
808        try {
809            wfBeans = WorkflowJobQueryExecutor.getInstance().getList(WorkflowJobQuery.GET_WORKFLOWS_PARENT_COORD_RERUN,
810                    coordActionId);
811        }
812        catch (JPAExecutorException e) {
813            throw new CoordinatorEngineException(e);
814        }
815        return wfBeans;
816    }
817
818    /**
819     * Update coord job definition.
820     *
821     * @param conf the conf
822     * @param jobId the job id
823     * @param dryrun the dryrun
824     * @param showDiff the show diff
825     * @return the string
826     * @throws CoordinatorEngineException the coordinator engine exception
827     */
828    public String updateJob(Configuration conf, String jobId, boolean dryrun, boolean showDiff)
829            throws CoordinatorEngineException {
830        try {
831            CoordUpdateXCommand update = new CoordUpdateXCommand(dryrun, conf, jobId, showDiff);
832            return update.call();
833        }
834        catch (CommandException ex) {
835            throw new CoordinatorEngineException(ex);
836        }
837    }
838
839    /**
840     * Return the status for a Job ID
841     *
842     * @param jobId job Id.
843     * @return the job's status
844     * @throws CoordinatorEngineException thrown if the job's status could not be obtained
845     */
846    @Override
847    public String getJobStatus(String jobId) throws CoordinatorEngineException {
848        try {
849            CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get(
850                    CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_STATUS, jobId);
851            return coordJob.getStatusStr();
852        }
853        catch (JPAExecutorException e) {
854            throw new CoordinatorEngineException(e);
855        }
856    }
857
858    /**
859     * Return the status for an Action ID
860     *
861     * @param actionId action Id.
862     * @return the action's status
863     * @throws CoordinatorEngineException thrown if the action's status could not be obtained
864     */
865    public String getActionStatus(String actionId) throws CoordinatorEngineException {
866        try {
867            CoordinatorActionBean coordAction = CoordActionQueryExecutor.getInstance().get(
868                    CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION_STATUS, actionId);
869            return coordAction.getStatusStr();
870        }
871        catch (JPAExecutorException e) {
872            throw new CoordinatorEngineException(e);
873        }
874    }
875
876    @Override
877    public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
878        try {
879            new CoordSLAAlertsDisableXCommand(id, actions, dates).call();
880
881        }
882        catch (CommandException e) {
883            throw new CoordinatorEngineException(e);
884        }
885    }
886
887    @Override
888    public void changeSLA(String id, String actions, String dates, String childIds, String newParams)
889            throws BaseEngineException {
890        Map<String, String> slaNewParams = null;
891
892        try {
893
894            if (newParams != null) {
895                slaNewParams = JobUtils.parseChangeValue(newParams);
896            }
897
898            new CoordSLAChangeXCommand(id, actions, dates, slaNewParams).call();
899
900        }
901        catch (CommandException e) {
902            throw new CoordinatorEngineException(e);
903        }
904    }
905
906    @Override
907    public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
908        try {
909            new CoordSLAAlertsEnableXCommand(id, actions, dates).call();
910
911        }
912        catch (CommandException e) {
913            throw new CoordinatorEngineException(e);
914        }
915    }
916
917    /**
918     * return a list of killed Coordinator job
919     *
920     * @param filter, the filter string for which the coordinator jobs are killed
921     * @param start, the starting index for coordinator jobs
922     * @param length, maximum number of jobs to be killed
923     * @return the list of jobs being killed
924     * @throws CoordinatorEngineException thrown if one or more of the jobs cannot be killed
925     */
926    public CoordinatorJobInfo killJobs(String filter, int start, int length) throws CoordinatorEngineException {
927        try {
928            Map<String, List<String>> filterMap = parseJobsFilter(filter);
929            CoordinatorJobInfo coordinatorJobInfo =
930                    new BulkCoordXCommand(filterMap, start, length, OperationType.Kill).call();
931            if (coordinatorJobInfo == null) {
932                return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0);
933            }
934            return coordinatorJobInfo;
935        }
936        catch (CommandException ex) {
937            throw new CoordinatorEngineException(ex);
938        }
939    }
940
941    /**
942     * return the jobs that've been suspended
943     * @param filter Filter for jobs that will be suspended, can be name, user, group, status, id or combination of any
944     * @param start Offset for the jobs that will be suspended
945     * @param length maximum number of jobs that will be suspended
946     * @return
947     * @throws CoordinatorEngineException
948     */
949    public CoordinatorJobInfo suspendJobs(String filter, int start, int length) throws CoordinatorEngineException {
950        try {
951            Map<String, List<String>> filterMap = parseJobsFilter(filter);
952            CoordinatorJobInfo coordinatorJobInfo =
953                    new BulkCoordXCommand(filterMap, start, length, OperationType.Suspend).call();
954            if (coordinatorJobInfo == null) {
955                return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0);
956            }
957            return coordinatorJobInfo;
958        }
959        catch (CommandException ex) {
960            throw new CoordinatorEngineException(ex);
961        }
962    }
963
964    /**
965     * return the jobs that've been resumed
966     * @param filter Filter for jobs that will be resumed, can be name, user, group, status, id or combination of any
967     * @param start Offset for the jobs that will be resumed
968     * @param length maximum number of jobs that will be resumed
969     * @return
970     * @throws CoordinatorEngineException
971     */
972    public CoordinatorJobInfo resumeJobs(String filter, int start, int length) throws CoordinatorEngineException {
973        try {
974            Map<String, List<String>> filterMap = parseJobsFilter(filter);
975            CoordinatorJobInfo coordinatorJobInfo =
976                    new BulkCoordXCommand(filterMap, start, length, OperationType.Resume).call();
977            if (coordinatorJobInfo == null) {
978                return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0);
979            }
980            return coordinatorJobInfo;
981        }
982        catch (CommandException ex) {
983            throw new CoordinatorEngineException(ex);
984        }
985    }
986}