001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie;
020
021import java.io.IOException;
022import java.io.Writer;
023import java.sql.Timestamp;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.Comparator;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.LinkedHashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.StringTokenizer;
037
038import org.apache.commons.lang.StringUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.oozie.client.CoordinatorAction;
041import org.apache.oozie.client.CoordinatorJob;
042import org.apache.oozie.client.OozieClient;
043import org.apache.oozie.client.WorkflowJob;
044import org.apache.oozie.client.rest.RestConstants;
045import org.apache.oozie.command.CommandException;
046import org.apache.oozie.command.OperationType;
047import org.apache.oozie.command.coord.BulkCoordXCommand;
048import org.apache.oozie.command.coord.CoordActionInfoXCommand;
049import org.apache.oozie.command.coord.CoordActionsIgnoreXCommand;
050import org.apache.oozie.command.coord.CoordActionsKillXCommand;
051import org.apache.oozie.command.coord.CoordChangeXCommand;
052import org.apache.oozie.command.coord.CoordActionMissingDependenciesXCommand;
053import org.apache.oozie.command.coord.CoordJobXCommand;
054import org.apache.oozie.command.coord.CoordJobsXCommand;
055import org.apache.oozie.command.coord.CoordKillXCommand;
056import org.apache.oozie.command.coord.CoordRerunXCommand;
057import org.apache.oozie.command.coord.CoordResumeXCommand;
058import org.apache.oozie.command.coord.CoordSLAAlertsDisableXCommand;
059import org.apache.oozie.command.coord.CoordSLAAlertsEnableXCommand;
060import org.apache.oozie.command.coord.CoordSLAChangeXCommand;
061import org.apache.oozie.command.coord.CoordSubmitXCommand;
062import org.apache.oozie.command.coord.CoordSuspendXCommand;
063import org.apache.oozie.command.coord.CoordUpdateXCommand;
064import org.apache.oozie.command.coord.CoordWfActionInfoXCommand;
065import org.apache.oozie.dependency.ActionDependency;
066import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
067import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
068import org.apache.oozie.executor.jpa.JPAExecutorException;
069import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
070import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
071import org.apache.oozie.service.DagXLogInfoService;
072import org.apache.oozie.service.Services;
073import org.apache.oozie.service.XLogStreamingService;
074import org.apache.oozie.util.CoordActionsInDateRange;
075import org.apache.oozie.util.DateUtils;
076import org.apache.oozie.util.JobUtils;
077import org.apache.oozie.util.Pair;
078import org.apache.oozie.util.ParamChecker;
079import org.apache.oozie.util.XLog;
080import org.apache.oozie.util.XLogFilter;
081import org.apache.oozie.util.XLogStreamer;
082import org.apache.oozie.util.XLogUserFilterParam;
083
084import com.google.common.annotations.VisibleForTesting;
085
086public class CoordinatorEngine extends BaseEngine {
087    private static final XLog LOG = XLog.getLog(CoordinatorEngine.class);
088    public final static String COORD_ACTIONS_LOG_MAX_COUNT = "oozie.coord.actions.log.max.count";
089    private final static int COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT = 50;
090    private final int maxNumActionsForLog;
091
092    public enum FILTER_COMPARATORS {
093        //This ordering is important, dont change this
094        GREATER_EQUAL(">="), GREATER(">"), LESSTHAN_EQUAL("<="), LESSTHAN("<"), NOT_EQUALS("!="), EQUALS("=");
095
096        private final String sign;
097
098        FILTER_COMPARATORS(String sign) {
099            this.sign = sign;
100        }
101
102        public String getSign() {
103            return sign;
104        }
105    }
106
107    public static final String[] VALID_JOB_FILTERS = {OozieClient.FILTER_STATUS, OozieClient.FILTER_NOMINAL_TIME};
108
109    /**
110     * Create a system Coordinator engine, with no user and no group.
111     */
112    public CoordinatorEngine() {
113        maxNumActionsForLog = Services.get().getConf()
114                .getInt(COORD_ACTIONS_LOG_MAX_COUNT, COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT);
115    }
116
117    /**
118     * Create a Coordinator engine to perform operations on behave of a user.
119     *
120     * @param user user name.
121     */
122    public CoordinatorEngine(String user) {
123        this();
124        this.user = ParamChecker.notEmpty(user, "user");
125    }
126
127    /*
128     * (non-Javadoc)
129     *
130     * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
131     */
132    @Override
133    public String getDefinition(String jobId) throws BaseEngineException {
134        CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
135        return job.getOrigJobXml();
136    }
137
138    /**
139     * @param jobId the job ID
140     * @return CoordinatorJobBean
141     * @throws BaseEngineException if the bean could not be retrieved
142     */
143    private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException {
144        try {
145            return new CoordJobXCommand(jobId).call();
146        }
147        catch (CommandException ex) {
148            throw new BaseEngineException(ex);
149        }
150    }
151
152    /**
153     * @param actionId the ID of the action
154     * @return CoordinatorActionBean
155     * @throws BaseEngineException if the bean could not be retrieved
156     */
157    public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException {
158        try {
159            return new CoordActionInfoXCommand(actionId).call();
160        }
161        catch (CommandException ex) {
162            throw new BaseEngineException(ex);
163        }
164    }
165
166    /*
167     * (non-Javadoc)
168     *
169     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
170     */
171    @Override
172    public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException {
173        try {
174            return new CoordJobXCommand(jobId).call();
175        }
176        catch (CommandException ex) {
177            throw new BaseEngineException(ex);
178        }
179    }
180
181    /*
182     * (non-Javadoc)
183     *
184     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int)
185     */
186    @Override
187    public CoordinatorJobBean getCoordJob(String jobId, String filter, int offset, int length, boolean desc)
188            throws BaseEngineException {
189        Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = parseJobFilter(filter);
190        try {
191            return new CoordJobXCommand(jobId, filterMap, offset, length, desc).call();
192        }
193        catch (CommandException ex) {
194            throw new BaseEngineException(ex);
195        }
196    }
197
198    /*
199     * (non-Javadoc)
200     *
201     * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
202     */
203    @Override
204    public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException {
205        return null;
206    }
207
208    /*
209     * (non-Javadoc)
210     *
211     * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
212     */
213    @Override
214    public void kill(String jobId) throws CoordinatorEngineException {
215        try {
216            new CoordKillXCommand(jobId).call();
217            LOG.info("User " + user + " killed the Coordinator job " + jobId);
218        }
219        catch (CommandException e) {
220            throw new CoordinatorEngineException(e);
221        }
222    }
223
224    public CoordinatorActionInfo killActions(String jobId, String rangeType, String scope) throws CoordinatorEngineException {
225        try {
226            return new CoordActionsKillXCommand(jobId, rangeType, scope).call();
227        }
228        catch (CommandException e) {
229            throw new CoordinatorEngineException(e);
230        }
231    }
232
233    /* (non-Javadoc)
234     * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
235     */
236    @Override
237    public void change(String jobId, String changeValue) throws CoordinatorEngineException {
238        try {
239            new CoordChangeXCommand(jobId, changeValue).call();
240            LOG.info("User " + user + " changed the Coordinator job [" + jobId + "] to " + changeValue);
241        }
242        catch (CommandException e) {
243            throw new CoordinatorEngineException(e);
244        }
245    }
246
247    public CoordinatorActionInfo ignore(String jobId, String type, String scope) throws CoordinatorEngineException {
248        try {
249            LOG.info("User " + user + " ignore a Coordinator Action (s) [" + scope + "] of the Coordinator Job ["
250                    + jobId + "]");
251            return new CoordActionsIgnoreXCommand(jobId, type, scope).call();
252        }
253        catch (CommandException e) {
254            throw new CoordinatorEngineException(e);
255        }
256    }
257
258    @Override
259    @Deprecated
260    public void reRun(String jobId, Configuration conf) throws BaseEngineException {
261        throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun"));
262    }
263
264    /**
265     * Rerun coordinator actions for given rerunType
266     *
267     * @param jobId the job ID
268     * @param rerunType rerun type {@link RestConstants#JOB_COORD_SCOPE_DATE} or {@link RestConstants#JOB_COORD_SCOPE_ACTION}
269     * @param scope the rerun scope for given rerunType separated by ","
270     * @param refresh true if user wants to refresh input/output dataset urls
271     * @param noCleanup false if user wants to cleanup output events for given rerun actions
272     * @param failed true if user wants to rerun only failed nodes
273     * @param conf configuration values for actions
274     * @return  the action info
275     * @throws BaseEngineException thrown if the actions could not be rerun
276     */
277    public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup,
278                                       boolean failed, Configuration conf)
279            throws BaseEngineException {
280        try {
281            return new CoordRerunXCommand(jobId, rerunType, scope, refresh,
282                    noCleanup, failed, conf).call();
283        }
284        catch (CommandException ex) {
285            throw new BaseEngineException(ex);
286        }
287    }
288
289    /*
290     * (non-Javadoc)
291     *
292     * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
293     */
294    @Override
295    public void resume(String jobId) throws CoordinatorEngineException {
296        try {
297            new CoordResumeXCommand(jobId).call();
298        }
299        catch (CommandException e) {
300            throw new CoordinatorEngineException(e);
301        }
302    }
303
304    @Override
305    @Deprecated
306    public void start(String jobId) throws BaseEngineException {
307        throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start"));
308    }
309
310
311    @Override
312    protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer)
313            throws IOException, BaseEngineException {
314        logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId);
315        Date lastTime = null;
316        CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
317        if (job.isTerminalStatus()) {
318            lastTime = job.getLastModifiedTime();
319        }
320        if (lastTime == null) {
321            lastTime = new Date();
322        }
323        Services.get().get(XLogStreamingService.class).streamLog(logStreamer, job.getCreatedTime(), lastTime, writer);
324    }
325
326    /**
327     * Add list of actions to the filter based on conditions
328     *
329     * @param jobId Job Id
330     * @param logRetrievalScope Value for the retrieval type
331     * @param logRetrievalType Based on which filter criteria the log is retrieved
332     * @param writer writer to stream the log to
333     * @param requestParameters additional parameters from the request
334     * @throws IOException in case of IO error
335     * @throws BaseEngineException if there is an error during streaming
336     * @throws CommandException if a parameter could not be parsed
337     */
338    public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer,
339            Map<String, String[]> requestParameters) throws IOException, BaseEngineException, CommandException {
340
341        Date startTime = null;
342        Date endTime = null;
343        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(requestParameters));
344
345        filter.setParameter(DagXLogInfoService.JOB, jobId);
346        if (logRetrievalScope != null && logRetrievalType != null) {
347            // if coordinator action logs are to be retrieved based on action id range
348            if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) {
349                // Use set implementation that maintains order or elements to achieve reproducibility:
350                Set<String> actionSet = new LinkedHashSet<String>();
351                String[] list = logRetrievalScope.split(",");
352                for (String s : list) {
353                    s = s.trim();
354                    if (s.contains("-")) {
355                        String[] range = s.split("-");
356                        if (range.length != 2) {
357                            throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
358                                    + "'");
359                        }
360                        int start;
361                        int end;
362                        try {
363                            start = Integer.parseInt(range[0].trim());
364                        } catch (NumberFormatException ne) {
365                            throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer",
366                                    ne);
367                        }
368                        try {
369                            end = Integer.parseInt(range[1].trim());
370                        } catch (NumberFormatException ne) {
371                            throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer",
372                                    ne);
373                        }
374                        if (start > end) {
375                            throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
376                        }
377                        for (int i = start; i <= end; i++) {
378                            actionSet.add(jobId + "@" + i);
379                        }
380                    }
381                    else {
382                        try {
383                            Integer.parseInt(s);
384                        }
385                        catch (NumberFormatException ne) {
386                            throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
387                                    + "'. Integer only.");
388                        }
389                        actionSet.add(jobId + "@" + s);
390                    }
391                }
392
393                if (actionSet.size() >= maxNumActionsForLog) {
394                    throw new CommandException(ErrorCode.E0302,
395                            "Retrieving log of too many coordinator actions. Max count is "
396                                    + maxNumActionsForLog + " actions");
397                }
398                Iterator<String> actionsIterator = actionSet.iterator();
399                StringBuilder orSeparatedActions = new StringBuilder("");
400                boolean orRequired = false;
401                while (actionsIterator.hasNext()) {
402                    if (orRequired) {
403                        orSeparatedActions.append("|");
404                    }
405                    orSeparatedActions.append(actionsIterator.next().toString());
406                    orRequired = true;
407                }
408                if (actionSet.size() > 1 && orRequired) {
409                    orSeparatedActions.insert(0, "(");
410                    orSeparatedActions.append(")");
411                }
412
413                filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
414                if (actionSet != null && actionSet.size() == 1) {
415                    CoordinatorActionBean actionBean = getCoordAction(actionSet.iterator().next());
416                    startTime = actionBean.getCreatedTime();
417                    endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean
418                            .getLastModifiedTime();
419                    filter.setActionList(true);
420                }
421                else if (actionSet != null && actionSet.size() > 0) {
422                    List<String> tempList = new ArrayList<String>(actionSet);
423                    Collections.sort(tempList, new Comparator<String>() {
424                        public int compare(String a, String b) {
425                            return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo(
426                                    Integer.valueOf(b.substring(b.lastIndexOf("@") + 1)));
427                        }
428                    });
429                    startTime = getCoordAction(tempList.get(0)).getCreatedTime();
430                    endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, tempList.get(0),
431                            tempList.get(tempList.size() - 1));
432                    filter.setActionList(true);
433                }
434            }
435            // if coordinator action logs are to be retrieved based on date range
436            // this block gets the corresponding list of coordinator actions to be used by the log filter
437            if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) {
438                List<String> coordActionIdList = null;
439                try {
440                    coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope);
441                }
442                catch (XException xe) {
443                    throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe);
444                }
445                if(coordActionIdList.size() >= maxNumActionsForLog) {
446                    throw new CommandException(ErrorCode.E0302,
447                            "Retrieving log of too many coordinator actions. Max count is "
448                                    + maxNumActionsForLog + " actions");
449                }
450                StringBuilder orSeparatedActions = new StringBuilder("");
451                boolean orRequired = false;
452                for (String coordActionId : coordActionIdList) {
453                    if (orRequired) {
454                        orSeparatedActions.append("|");
455                    }
456                    orSeparatedActions.append(coordActionId);
457                    orRequired = true;
458                }
459                if (coordActionIdList.size() > 1 && orRequired) {
460                    orSeparatedActions.insert(0, "(");
461                    orSeparatedActions.append(")");
462                }
463                filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
464                if (coordActionIdList != null && coordActionIdList.size() == 1) {
465                    CoordinatorActionBean actionBean = getCoordAction(coordActionIdList.get(0));
466                    startTime = actionBean.getCreatedTime();
467                    endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean
468                            .getLastModifiedTime();
469                    filter.setActionList(true);
470                }
471                else if (coordActionIdList != null && coordActionIdList.size() > 0) {
472                    Collections.sort(coordActionIdList, new Comparator<String>() {
473                        public int compare(String a, String b) {
474                            return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo(
475                                    Integer.valueOf(b.substring(b.lastIndexOf("@") + 1)));
476                        }
477                    });
478                    startTime = getCoordAction(coordActionIdList.get(0)).getCreatedTime();
479                    endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, coordActionIdList.get(0),
480                            coordActionIdList.get(coordActionIdList.size() - 1));
481                    filter.setActionList(true);
482                }
483            }
484        }
485        if (startTime == null || endTime == null) {
486            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
487            if (startTime == null) {
488                startTime = job.getCreatedTime();
489            }
490            if (endTime == null) {
491                if (job.isTerminalStatus()) {
492                    endTime = job.getLastModifiedTime();
493                }
494                if (endTime == null) {
495                    endTime = new Date();
496                }
497            }
498        }
499        Services.get().get(XLogStreamingService.class).streamLog(new XLogStreamer(filter, requestParameters), startTime,
500                endTime, writer);
501    }
502
503    /*
504     * (non-Javadoc)
505     *
506     * @see
507     * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration
508     * , boolean)
509     */
510    @Override
511    public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException {
512        try {
513            CoordSubmitXCommand submit = new CoordSubmitXCommand(conf);
514            return submit.call();
515        }
516        catch (CommandException ex) {
517            throw new CoordinatorEngineException(ex);
518        }
519    }
520
521    /*
522     * (non-Javadoc)
523     *
524     * @see
525     * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
526     */
527    @Override
528    public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException {
529        try {
530            CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf);
531            return submit.call();
532        }
533        catch (CommandException ex) {
534            throw new CoordinatorEngineException(ex);
535        }
536    }
537
538    /*
539     * (non-Javadoc)
540     *
541     * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
542     */
543    @Override
544    public void suspend(String jobId) throws CoordinatorEngineException {
545        try {
546            new CoordSuspendXCommand(jobId).call();
547        }
548        catch (CommandException e) {
549            throw new CoordinatorEngineException(e);
550        }
551
552    }
553
554    /*
555     * (non-Javadoc)
556     *
557     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
558     */
559    @Override
560    public WorkflowJob getJob(String jobId) throws BaseEngineException {
561        throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
562    }
563
564    /*
565     * (non-Javadoc)
566     *
567     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
568     */
569    @Override
570    public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException {
571        throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
572    }
573
574    private static final Set<String> FILTER_NAMES = new HashSet<String>();
575
576    static {
577        FILTER_NAMES.add(OozieClient.FILTER_USER);
578        FILTER_NAMES.add(OozieClient.FILTER_NAME);
579        FILTER_NAMES.add(OozieClient.FILTER_GROUP);
580        FILTER_NAMES.add(OozieClient.FILTER_STATUS);
581        FILTER_NAMES.add(OozieClient.FILTER_ID);
582        FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
583        FILTER_NAMES.add(OozieClient.FILTER_UNIT);
584        FILTER_NAMES.add(OozieClient.FILTER_SORT_BY);
585        FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_START);
586        FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_END);
587        FILTER_NAMES.add(OozieClient.FILTER_TEXT);
588    }
589
590    /**
591     * @param filter he filter to parse. Elements must be semicolon-separated name=value pairs.
592     *               Supported names are in{@link CoordinatorEngine#FILTER_NAMES}.
593     * @param start start from this job in the coordinator
594     * @param len maximum number of results
595     * @return CoordinatorJobInfo
596     * @throws CoordinatorEngineException if the job info could no be retrieved
597     */
598    public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException {
599        Map<String, List<String>> filterList = parseJobsFilter(filter);
600
601        try {
602            return new CoordJobsXCommand(filterList, start, len).call();
603        }
604        catch (CommandException ex) {
605            throw new CoordinatorEngineException(ex);
606        }
607    }
608
609    // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values
610    public Map<Pair<String, FILTER_COMPARATORS>, List<Object>> parseJobFilter(String filter) throws
611        CoordinatorEngineException {
612        Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = new HashMap<Pair<String,
613            FILTER_COMPARATORS>, List<Object>>();
614        if (filter != null) {
615            //split name value pairs
616            StringTokenizer st = new StringTokenizer(filter, ";");
617            while (st.hasMoreTokens()) {
618                String token = st.nextToken().trim();
619                Pair<String, FILTER_COMPARATORS> pair = null;
620                for (FILTER_COMPARATORS comp : FILTER_COMPARATORS.values()) {
621                    if (token.contains(comp.getSign())) {
622                        int index = token.indexOf(comp.getSign());
623                        String key = token.substring(0, index);
624                        String valueStr = token.substring(index + comp.getSign().length());
625                        Object value;
626
627                        if (key.equalsIgnoreCase(OozieClient.FILTER_STATUS)) {
628                            value = valueStr.toUpperCase();
629                            try {
630                                CoordinatorAction.Status.valueOf((String) value);
631                            } catch (IllegalArgumentException ex) {
632                                // Check for incorrect status value
633                                throw new CoordinatorEngineException(ErrorCode.E0421, filter,
634                                    XLog.format("invalid status value [{0}]." + " Valid status values are: [{1}]",
635                                        valueStr, StringUtils.join(CoordinatorAction.Status.values(), ", ")));
636                            }
637
638                            if (!(comp == FILTER_COMPARATORS.EQUALS || comp == FILTER_COMPARATORS.NOT_EQUALS)) {
639                                throw new CoordinatorEngineException(ErrorCode.E0421, filter,
640                                    XLog.format("invalid comparator [{0}] for status." + " Valid are = and !=",
641                                        comp.getSign()));
642                            }
643
644                            pair = Pair.of(OozieClient.FILTER_STATUS, comp);
645                        } else if (key.equalsIgnoreCase(OozieClient.FILTER_NOMINAL_TIME)) {
646                            try {
647                                value = new Timestamp(DateUtils.parseDateUTC(valueStr).getTime());
648                            } catch (ParseException e) {
649                                throw new CoordinatorEngineException(ErrorCode.E0421, filter,
650                                    XLog.format("invalid nominal time [{0}]." + " Valid format: " +
651                                            "[{1}]", valueStr, DateUtils.ISO8601_UTC_MASK));
652                            }
653                            pair = Pair.of(OozieClient.FILTER_NOMINAL_TIME, comp);
654                        } else {
655                            // Check for incorrect filter option
656                            throw new CoordinatorEngineException(ErrorCode.E0421, filter,
657                                XLog.format("invalid filter [{0}]." + " Valid filters [{1}]", key, StringUtils.join
658                                    (VALID_JOB_FILTERS, ", ")));
659                        }
660                        if (!filterMap.containsKey(pair)) {
661                            filterMap.put(pair, new ArrayList<Object>());
662                        }
663                        filterMap.get(pair).add(value);
664                        break;
665                    }
666                }
667
668                if (pair == null) {
669                    //token doesn't contain comparator
670                    throw new CoordinatorEngineException(ErrorCode.E0421, filter,
671                        "filter should be of format <key><comparator><value> pairs");
672                }
673            }
674        }
675        return filterMap;
676    }
677
678    /**
679     * @param filter the filter to parse. Elements must be semicolon-separated name=value pairs.
680     *               Supported names are in{@link CoordinatorEngine#FILTER_NAMES}.
681     * @return Map<String, List<String>> map of parsed filters
682     * @throws CoordinatorEngineException if the parameter could not be parsed
683     */
684    @VisibleForTesting
685    Map<String, List<String>> parseJobsFilter(String filter) throws CoordinatorEngineException {
686        Map<String, List<String>> map = new HashMap<String, List<String>>();
687        boolean isTimeUnitSpecified = false;
688        String timeUnit = "MINUTE";
689        boolean isFrequencySpecified = false;
690        String frequency = "";
691        if (filter != null) {
692            StringTokenizer st = new StringTokenizer(filter, ";");
693            while (st.hasMoreTokens()) {
694                String token = st.nextToken();
695                if (token.contains("=")) {
696                    String[] pair = token.split("=");
697                    if (pair.length != 2) {
698                        throw new CoordinatorEngineException(ErrorCode.E0420, filter,
699                                "elements must be semicolon-separated name=value pairs");
700                    }
701                    pair[0] = pair[0].toLowerCase();
702                    if (!FILTER_NAMES.contains(pair[0])) {
703                        throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
704                                pair[0]));
705                    }
706                    if (pair[0].equalsIgnoreCase("frequency")) {
707                        isFrequencySpecified = true;
708                        try {
709                            frequency = (int) Float.parseFloat(pair[1]) + "";
710                            continue;
711                        }
712                        catch (NumberFormatException NANException) {
713                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
714                                    "invalid value [{0}] for frequency. A numerical value is expected", pair[1]));
715                        }
716                    }
717                    if (pair[0].equalsIgnoreCase("unit")) {
718                        isTimeUnitSpecified = true;
719                        timeUnit = pair[1];
720                        if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days")
721                                && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) {
722                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
723                                    "invalid value [{0}] for time unit. "
724                                            + "Valid value is one of months, days, hours or minutes", pair[1]));
725                        }
726                        continue;
727                    }
728                    if (pair[0].equals("status")) {
729                        try {
730                            CoordinatorJob.Status.valueOf(pair[1]);
731                        }
732                        catch (IllegalArgumentException ex) {
733                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
734                                    "invalid status [{0}]", pair[1]));
735                        }
736                    }
737                    List<String> list = map.get(pair[0]);
738                    if (list == null) {
739                        list = new ArrayList<String>();
740                        map.put(pair[0], list);
741                    }
742                    list.add(pair[1]);
743                } else {
744                    throw new CoordinatorEngineException(ErrorCode.E0420, filter,
745                            "elements must be semicolon-separated name=value pairs");
746                }
747            }
748            // Unit is specified and frequency is not specified
749            if (!isFrequencySpecified && isTimeUnitSpecified) {
750                throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when "
751                        + "frequency is specified. Either specify frequency also or else remove the time unit");
752            } else if (isFrequencySpecified) {
753                // Frequency value is specified
754                if (isTimeUnitSpecified) {
755                    if (timeUnit.equalsIgnoreCase("months")) {
756                        timeUnit = "MONTH";
757                    } else if (timeUnit.equalsIgnoreCase("days")) {
758                        timeUnit = "DAY";
759                    } else if (timeUnit.equalsIgnoreCase("hours")) {
760                        // When job details are persisted to database, frequency in hours are converted to minutes.
761                        // This conversion is to conform with that.
762                        frequency = Integer.parseInt(frequency) * 60 + "";
763                        timeUnit = "MINUTE";
764                    } else if (timeUnit.equalsIgnoreCase("minutes")) {
765                        timeUnit = "MINUTE";
766                    }
767                }
768                // Adding the frequency and time unit filters to the filter map
769                List<String> list = new ArrayList<String>();
770                list.add(timeUnit);
771                map.put("unit", list);
772                list = new ArrayList<String>();
773                list.add(frequency);
774                map.put("frequency", list);
775            }
776        }
777        return map;
778    }
779
780    public List<WorkflowJobBean> getReruns(String coordActionId) throws CoordinatorEngineException {
781        List<WorkflowJobBean> wfBeans;
782        try {
783            wfBeans = WorkflowJobQueryExecutor.getInstance().getList(WorkflowJobQuery.GET_WORKFLOWS_PARENT_COORD_RERUN,
784                    coordActionId);
785        }
786        catch (JPAExecutorException e) {
787            throw new CoordinatorEngineException(e);
788        }
789        return wfBeans;
790    }
791
792    /**
793     * Update coord job definition.
794     *
795     * @param conf the conf
796     * @param jobId the job id
797     * @param dryrun the dryrun
798     * @param showDiff the show diff
799     * @return the string
800     * @throws CoordinatorEngineException the coordinator engine exception
801     */
802    public String updateJob(Configuration conf, String jobId, boolean dryrun, boolean showDiff)
803            throws CoordinatorEngineException {
804        try {
805            CoordUpdateXCommand update = new CoordUpdateXCommand(dryrun, conf, jobId, showDiff);
806            return update.call();
807        }
808        catch (CommandException ex) {
809            throw new CoordinatorEngineException(ex);
810        }
811    }
812
813    /**
814     * Return the status for a Job ID
815     *
816     * @param jobId job Id.
817     * @return the job's status
818     * @throws CoordinatorEngineException thrown if the job's status could not be obtained
819     */
820    @Override
821    public String getJobStatus(String jobId) throws CoordinatorEngineException {
822        try {
823            CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get(
824                    CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_STATUS, jobId);
825            return coordJob.getStatusStr();
826        }
827        catch (JPAExecutorException e) {
828            throw new CoordinatorEngineException(e);
829        }
830    }
831
832    /**
833     * Return the status for an Action ID
834     *
835     * @param actionId action Id.
836     * @return the action's status
837     * @throws CoordinatorEngineException thrown if the action's status could not be obtained
838     */
839    public String getActionStatus(String actionId) throws CoordinatorEngineException {
840        try {
841            CoordinatorActionBean coordAction = CoordActionQueryExecutor.getInstance().get(
842                    CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION_STATUS, actionId);
843            return coordAction.getStatusStr();
844        }
845        catch (JPAExecutorException e) {
846            throw new CoordinatorEngineException(e);
847        }
848    }
849
850    @Override
851    public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
852        try {
853            new CoordSLAAlertsDisableXCommand(id, actions, dates).call();
854
855        }
856        catch (CommandException e) {
857            throw new CoordinatorEngineException(e);
858        }
859    }
860
861    @Override
862    public void changeSLA(String id, String actions, String dates, String childIds, String newParams)
863            throws BaseEngineException {
864        Map<String, String> slaNewParams = null;
865
866        try {
867
868            if (newParams != null) {
869                slaNewParams = JobUtils.parseChangeValue(newParams);
870            }
871
872            new CoordSLAChangeXCommand(id, actions, dates, slaNewParams).call();
873
874        }
875        catch (CommandException e) {
876            throw new CoordinatorEngineException(e);
877        }
878    }
879
880    @Override
881    public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
882        try {
883            new CoordSLAAlertsEnableXCommand(id, actions, dates).call();
884
885        }
886        catch (CommandException e) {
887            throw new CoordinatorEngineException(e);
888        }
889    }
890
891    /**
892     * return a list of killed Coordinator job
893     *
894     * @param filter the filter string for which the coordinator jobs are killed
895     * @param start the starting index for coordinator jobs
896     * @param length maximum number of jobs to be killed
897     * @return coordinatorJobInfo the list of jobs being killed
898     * @throws CoordinatorEngineException thrown if one or more of the jobs cannot be killed
899     */
900    public CoordinatorJobInfo killJobs(String filter, int start, int length) throws CoordinatorEngineException {
901        try {
902            Map<String, List<String>> filterMap = parseJobsFilter(filter);
903            CoordinatorJobInfo coordinatorJobInfo =
904                    new BulkCoordXCommand(filterMap, start, length, OperationType.Kill).call();
905            if (coordinatorJobInfo == null) {
906                return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0);
907            }
908            return coordinatorJobInfo;
909        }
910        catch (CommandException ex) {
911            throw new CoordinatorEngineException(ex);
912        }
913    }
914
915    /**
916     * return the jobs that've been suspended
917     * @param filter Filter for jobs that will be suspended, can be name, user, group, status, id or combination of any
918     * @param start Offset for the jobs that will be suspended
919     * @param length maximum number of jobs that will be suspended
920     * @return coordinatorJobInfo
921     * @throws CoordinatorEngineException if the jobs could not be suspended
922     */
923    public CoordinatorJobInfo suspendJobs(String filter, int start, int length) throws CoordinatorEngineException {
924        try {
925            Map<String, List<String>> filterMap = parseJobsFilter(filter);
926            CoordinatorJobInfo coordinatorJobInfo =
927                    new BulkCoordXCommand(filterMap, start, length, OperationType.Suspend).call();
928            if (coordinatorJobInfo == null) {
929                return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0);
930            }
931            return coordinatorJobInfo;
932        }
933        catch (CommandException ex) {
934            throw new CoordinatorEngineException(ex);
935        }
936    }
937
938    /**
939     * return the jobs that've been resumed
940     * @param filter Filter for jobs that will be resumed, can be name, user, group, status, id or combination of any
941     * @param start Offset for the jobs that will be resumed
942     * @param length maximum number of jobs that will be resumed
943     * @return coordinatorJobInfo returns resumed jobs
944     * @throws CoordinatorEngineException if the jobs could not be resumed
945     */
946    public CoordinatorJobInfo resumeJobs(String filter, int start, int length) throws CoordinatorEngineException {
947        try {
948            Map<String, List<String>> filterMap = parseJobsFilter(filter);
949            CoordinatorJobInfo coordinatorJobInfo =
950                    new BulkCoordXCommand(filterMap, start, length, OperationType.Resume).call();
951            if (coordinatorJobInfo == null) {
952                return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0);
953            }
954            return coordinatorJobInfo;
955        }
956        catch (CommandException ex) {
957            throw new CoordinatorEngineException(ex);
958        }
959    }
960    /**
961     * Get coord action missing dependencies
962     * @param id jobID
963     * @param actions action list
964     * @param dates nominal time list
965     * @return CoordActionMissingDependenciesXCommand pair of coord action bean and
966     * list of missing input dependencies.
967     * @throws CommandException if the actions could not be retrieved
968     */
969    public List<Pair<CoordinatorActionBean, Map<String, ActionDependency>>> getCoordActionMissingDependencies(String id,
970            String actions, String dates) throws CommandException {
971        return new CoordActionMissingDependenciesXCommand(id, actions, dates).call();
972    }
973
974    /**
975     * get wf actions by action name in a coordinator job
976     * @param jobId coordinator job id
977     * @param wfActionName workflow action name
978     * @param offset offset in the coordinator job
979     * @param len maximum number of results
980     * @return CoordWfActionInfoXCommand list of CoordinatorWfActionBean in a coordinator
981     * @throws CoordinatorEngineException if the actions could not be retrieved
982     */
983     public List<CoordinatorWfActionBean> getWfActionByJobIdAndName(String jobId, String wfActionName, int offset, int len)
984             throws CoordinatorEngineException {
985        try {
986            return new CoordWfActionInfoXCommand(jobId, wfActionName, offset, len).call();
987        }
988        catch (CommandException ex) {
989            throw new CoordinatorEngineException(ex);
990        }
991     }
992}