001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.IOException;
021    import java.io.Writer;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.LinkedHashSet;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    import java.util.StringTokenizer;
032    
033    import org.apache.hadoop.conf.Configuration;
034    import org.apache.oozie.client.CoordinatorAction;
035    import org.apache.oozie.client.CoordinatorJob;
036    import org.apache.oozie.client.OozieClient;
037    import org.apache.oozie.client.WorkflowJob;
038    import org.apache.oozie.client.rest.RestConstants;
039    import org.apache.oozie.command.CommandException;
040    import org.apache.oozie.command.coord.CoordActionInfoXCommand;
041    import org.apache.oozie.util.CoordActionsInDateRange;
042    import org.apache.oozie.command.coord.CoordChangeXCommand;
043    import org.apache.oozie.command.coord.CoordJobXCommand;
044    import org.apache.oozie.command.coord.CoordJobsXCommand;
045    import org.apache.oozie.command.coord.CoordKillXCommand;
046    import org.apache.oozie.command.coord.CoordRerunXCommand;
047    import org.apache.oozie.command.coord.CoordResumeXCommand;
048    import org.apache.oozie.command.coord.CoordSubmitXCommand;
049    import org.apache.oozie.command.coord.CoordSuspendXCommand;
050    import org.apache.oozie.service.DagXLogInfoService;
051    import org.apache.oozie.service.Services;
052    import org.apache.oozie.service.XLogService;
053    import org.apache.oozie.util.ParamChecker;
054    import org.apache.oozie.util.XLog;
055    import org.apache.oozie.util.XLogStreamer;
056    
057    import com.google.common.annotations.VisibleForTesting;
058    
059    public class CoordinatorEngine extends BaseEngine {
060        private static XLog LOG = XLog.getLog(CoordinatorEngine.class);
061    
062        /**
063         * Create a system Coordinator engine, with no user and no group.
064         */
065        public CoordinatorEngine() {
066            if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
067                LOG.debug("Oozie CoordinatorEngine is not using XCommands.");
068            }
069            else {
070                LOG.debug("Oozie CoordinatorEngine is using XCommands.");
071            }
072        }
073    
074        /**
075         * Create a Coordinator engine to perform operations on behave of a user.
076         *
077         * @param user user name.
078         */
079        public CoordinatorEngine(String user) {
080            this();
081            this.user = ParamChecker.notEmpty(user, "user");
082        }
083    
084        /*
085         * (non-Javadoc)
086         *
087         * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
088         */
089        @Override
090        public String getDefinition(String jobId) throws BaseEngineException {
091            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
092            return job.getOrigJobXml();
093        }
094    
095        /**
096         * @param jobId
097         * @return CoordinatorJobBean
098         * @throws BaseEngineException
099         */
100        private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException {
101            try {
102                return new CoordJobXCommand(jobId).call();
103            }
104            catch (CommandException ex) {
105                throw new BaseEngineException(ex);
106            }
107        }
108    
109        /**
110         * @param actionId
111         * @return CoordinatorActionBean
112         * @throws BaseEngineException
113         */
114        public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException {
115            try {
116                return new CoordActionInfoXCommand(actionId).call();
117            }
118            catch (CommandException ex) {
119                throw new BaseEngineException(ex);
120            }
121        }
122    
123        /*
124         * (non-Javadoc)
125         *
126         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
127         */
128        @Override
129        public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException {
130            try {
131                return new CoordJobXCommand(jobId).call();
132            }
133            catch (CommandException ex) {
134                throw new BaseEngineException(ex);
135            }
136        }
137    
138        /*
139         * (non-Javadoc)
140         *
141         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int)
142         */
143        @Override
144        public CoordinatorJobBean getCoordJob(String jobId, String filter, int start, int length, boolean desc)
145                throws BaseEngineException {
146            List<String> filterList = parseStatusFilter(filter);
147            try {
148                return new CoordJobXCommand(jobId, filterList, start, length, desc)
149                        .call();
150            }
151            catch (CommandException ex) {
152                throw new BaseEngineException(ex);
153            }
154        }
155    
156        /*
157         * (non-Javadoc)
158         *
159         * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
160         */
161        @Override
162        public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException {
163            return null;
164        }
165    
166        /*
167         * (non-Javadoc)
168         *
169         * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
170         */
171        @Override
172        public void kill(String jobId) throws CoordinatorEngineException {
173            try {
174                new CoordKillXCommand(jobId).call();
175                LOG.info("User " + user + " killed the Coordinator job " + jobId);
176            }
177            catch (CommandException e) {
178                throw new CoordinatorEngineException(e);
179            }
180        }
181    
182        /* (non-Javadoc)
183         * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
184         */
185        @Override
186        public void change(String jobId, String changeValue) throws CoordinatorEngineException {
187            try {
188                new CoordChangeXCommand(jobId, changeValue).call();
189                LOG.info("User " + user + " changed the Coordinator job " + jobId + " to " + changeValue);
190            }
191            catch (CommandException e) {
192                throw new CoordinatorEngineException(e);
193            }
194        }
195    
196        @Override
197        @Deprecated
198        public void reRun(String jobId, Configuration conf) throws BaseEngineException {
199            throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun"));
200        }
201    
202        /**
203         * Rerun coordinator actions for given rerunType
204         *
205         * @param jobId
206         * @param rerunType
207         * @param scope
208         * @param refresh
209         * @param noCleanup
210         * @throws BaseEngineException
211         */
212        public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup)
213                throws BaseEngineException {
214            try {
215                return new CoordRerunXCommand(jobId, rerunType, scope, refresh,
216                        noCleanup).call();
217            }
218            catch (CommandException ex) {
219                throw new BaseEngineException(ex);
220            }
221        }
222    
223        /*
224         * (non-Javadoc)
225         *
226         * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
227         */
228        @Override
229        public void resume(String jobId) throws CoordinatorEngineException {
230            try {
231                new CoordResumeXCommand(jobId).call();
232            }
233            catch (CommandException e) {
234                throw new CoordinatorEngineException(e);
235            }
236        }
237    
238        @Override
239        @Deprecated
240        public void start(String jobId) throws BaseEngineException {
241            throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start"));
242        }
243    
244        /*
245         * (non-Javadoc)
246         *
247         * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String,
248         * java.io.Writer)
249         */
250        @Override
251        public void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException {
252            XLogStreamer.Filter filter = new XLogStreamer.Filter();
253            filter.setParameter(DagXLogInfoService.JOB, jobId);
254    
255            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
256            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
257        }
258    
259        /**
260         * Add list of actions to the filter based on conditions
261         *
262         * @param jobId Job Id
263         * @param logRetrievalScope Value for the retrieval type
264         * @param logRetrievalType Based on which filter criteria the log is retrieved
265         * @param writer writer to stream the log to
266         * @throws IOException
267         * @throws BaseEngineException
268         * @throws CommandException
269         */
270        public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer)
271                throws IOException, BaseEngineException, CommandException {
272            XLogStreamer.Filter filter = new XLogStreamer.Filter();
273            filter.setParameter(DagXLogInfoService.JOB, jobId);
274            if (logRetrievalScope != null && logRetrievalType != null) {
275                // if coordinator action logs are to be retrieved based on action id range
276                if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) {
277                    // Use set implementation that maintains order or elements to achieve reproducibility:
278                    Set<String> actionSet = new LinkedHashSet<String>();
279                    String[] list = logRetrievalScope.split(",");
280                    for (String s : list) {
281                        s = s.trim();
282                        if (s.contains("-")) {
283                            String[] range = s.split("-");
284                            if (range.length != 2) {
285                                throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
286                                        + "'");
287                            }
288                            int start;
289                            int end;
290                            try {
291                                start = Integer.parseInt(range[0].trim());
292                            } catch (NumberFormatException ne) {
293                                throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer",
294                                        ne);
295                            }
296                            try {
297                                end = Integer.parseInt(range[1].trim());
298                            } catch (NumberFormatException ne) {
299                                throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer",
300                                        ne);
301                            }
302                            if (start > end) {
303                                throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
304                            }
305                            for (int i = start; i <= end; i++) {
306                                actionSet.add(jobId + "@" + i);
307                            }
308                        }
309                        else {
310                            try {
311                                Integer.parseInt(s);
312                            }
313                            catch (NumberFormatException ne) {
314                                throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
315                                        + "'. Integer only.");
316                            }
317                            actionSet.add(jobId + "@" + s);
318                        }
319                    }
320    
321                    Iterator<String> actionsIterator = actionSet.iterator();
322                    StringBuilder orSeparatedActions = new StringBuilder("");
323                    boolean orRequired = false;
324                    while (actionsIterator.hasNext()) {
325                        if (orRequired) {
326                            orSeparatedActions.append("|");
327                        }
328                        orSeparatedActions.append(actionsIterator.next().toString());
329                        orRequired = true;
330                    }
331                    if (actionSet.size() > 1 && orRequired) {
332                        orSeparatedActions.insert(0, "(");
333                        orSeparatedActions.append(")");
334                    }
335                    filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
336                }
337                // if coordinator action logs are to be retrieved based on date range
338                // this block gets the corresponding list of coordinator actions to be used by the log filter
339                if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) {
340                    List<String> coordActionIdList = null;
341                    try {
342                        coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope);
343                    }
344                    catch (XException xe) {
345                        throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe);
346                    }
347                    StringBuilder orSeparatedActions = new StringBuilder("");
348                    boolean orRequired = false;
349                    for (String coordActionId : coordActionIdList) {
350                        if (orRequired) {
351                            orSeparatedActions.append("|");
352                        }
353                        orSeparatedActions.append(coordActionId);
354                        orRequired = true;
355                    }
356                    if (coordActionIdList.size() > 1 && orRequired) {
357                        orSeparatedActions.insert(0, "(");
358                        orSeparatedActions.append(")");
359                    }
360                    filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
361                }
362            }
363            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
364            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
365        }
366    
367        /*
368         * (non-Javadoc)
369         *
370         * @see
371         * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration
372         * , boolean)
373         */
374        @Override
375        public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException {
376            try {
377                CoordSubmitXCommand submit = new CoordSubmitXCommand(conf);
378                return submit.call();
379            }
380            catch (CommandException ex) {
381                throw new CoordinatorEngineException(ex);
382            }
383        }
384    
385        /*
386         * (non-Javadoc)
387         *
388         * @see
389         * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
390         */
391        @Override
392        public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException {
393            try {
394                CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf);
395                return submit.call();
396            }
397            catch (CommandException ex) {
398                throw new CoordinatorEngineException(ex);
399            }
400        }
401    
402        /*
403         * (non-Javadoc)
404         *
405         * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
406         */
407        @Override
408        public void suspend(String jobId) throws CoordinatorEngineException {
409            try {
410                new CoordSuspendXCommand(jobId).call();
411            }
412            catch (CommandException e) {
413                throw new CoordinatorEngineException(e);
414            }
415    
416        }
417    
418        /*
419         * (non-Javadoc)
420         *
421         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
422         */
423        @Override
424        public WorkflowJob getJob(String jobId) throws BaseEngineException {
425            throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
426        }
427    
428        /*
429         * (non-Javadoc)
430         *
431         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
432         */
433        @Override
434        public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException {
435            throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
436        }
437    
438        private static final Set<String> FILTER_NAMES = new HashSet<String>();
439    
440        static {
441            FILTER_NAMES.add(OozieClient.FILTER_USER);
442            FILTER_NAMES.add(OozieClient.FILTER_NAME);
443            FILTER_NAMES.add(OozieClient.FILTER_GROUP);
444            FILTER_NAMES.add(OozieClient.FILTER_STATUS);
445            FILTER_NAMES.add(OozieClient.FILTER_ID);
446            FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
447            FILTER_NAMES.add(OozieClient.FILTER_UNIT);
448        }
449    
450        /**
451         * @param filter
452         * @param start
453         * @param len
454         * @return CoordinatorJobInfo
455         * @throws CoordinatorEngineException
456         */
457        public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException {
458            Map<String, List<String>> filterList = parseFilter(filter);
459    
460            try {
461                return new CoordJobsXCommand(filterList, start, len).call();
462            }
463            catch (CommandException ex) {
464                throw new CoordinatorEngineException(ex);
465            }
466        }
467    
468    
469        // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values
470        private List<String> parseStatusFilter(String filter) throws CoordinatorEngineException {
471            List<String> filterList = new ArrayList<String>();
472            if (filter != null) {
473                //split name;value pairs
474                StringTokenizer st = new StringTokenizer(filter, ";");
475                while (st.hasMoreTokens()) {
476                    String token = st.nextToken();
477                    if (token.contains("=")) {
478                        String[] pair = token.split("=");
479                        if (pair.length != 2) {
480                            throw new CoordinatorEngineException(ErrorCode.E0421, token,
481                                    "elements must be name=value pairs");
482                        }
483                        if (pair[0].equalsIgnoreCase("status")) {
484                            String statusValue = pair[1];
485                            try {
486                                CoordinatorAction.Status.valueOf(statusValue);
487                            } catch (IllegalArgumentException ex) {
488                                StringBuilder validStatusList = new StringBuilder();
489                                for (CoordinatorAction.Status status: CoordinatorAction.Status.values()){
490                                    validStatusList.append(status.toString()+" ");
491                                }
492                                // Check for incorrect status value
493                                throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
494                                    "invalid status value [{0}]." + " Valid status values are: [{1}]", statusValue, validStatusList));
495                            }
496                            filterList.add(statusValue);
497                        } else {
498                            // Check for incorrect filter option
499                            throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
500                                    "invalid filter [{0}]." + " The only valid filter is \"status\"", pair[0]));
501                        }
502                    } else {
503                        throw new CoordinatorEngineException(ErrorCode.E0421, token,
504                                 "elements must be name=value pairs");
505                    }
506                }
507            }
508            return filterList;
509        }
510    
511        /**
512         * @param filter
513         * @return Map<String, List<String>>
514         * @throws CoordinatorEngineException
515         */
516        @VisibleForTesting
517        Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException {
518            Map<String, List<String>> map = new HashMap<String, List<String>>();
519            boolean isTimeUnitSpecified = false;
520            String timeUnit = "MINUTE";
521            boolean isFrequencySpecified = false;
522            String frequency = "";
523            if (filter != null) {
524                StringTokenizer st = new StringTokenizer(filter, ";");
525                while (st.hasMoreTokens()) {
526                    String token = st.nextToken();
527                    if (token.contains("=")) {
528                        String[] pair = token.split("=");
529                        if (pair.length != 2) {
530                            throw new CoordinatorEngineException(ErrorCode.E0420, filter,
531                                    "elements must be name=value pairs");
532                        }
533                        if (!FILTER_NAMES.contains(pair[0].toLowerCase())) {
534                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
535                                    pair[0]));
536                        }
537                        if (pair[0].equalsIgnoreCase("frequency")) {
538                            isFrequencySpecified = true;
539                            try {
540                                frequency = (int) Float.parseFloat(pair[1]) + "";
541                                continue;
542                            }
543                            catch (NumberFormatException NANException) {
544                                throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
545                                        "invalid value [{0}] for frequency. A numerical value is expected", pair[1]));
546                            }
547                        }
548                        if (pair[0].equalsIgnoreCase("unit")) {
549                            isTimeUnitSpecified = true;
550                            timeUnit = pair[1];
551                            if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days")
552                                    && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) {
553                                throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
554                                        "invalid value [{0}] for time unit. "
555                                                + "Valid value is one of months, days, hours or minutes", pair[1]));
556                            }
557                            continue;
558                        }
559                        if (pair[0].equals("status")) {
560                            try {
561                                CoordinatorJob.Status.valueOf(pair[1]);
562                            }
563                            catch (IllegalArgumentException ex) {
564                                throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
565                                        "invalid status [{0}]", pair[1]));
566                            }
567                        }
568                        List<String> list = map.get(pair[0]);
569                        if (list == null) {
570                            list = new ArrayList<String>();
571                            map.put(pair[0], list);
572                        }
573                        list.add(pair[1]);
574                    } else {
575                        throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
576                    }
577                }
578                // Unit is specified and frequency is not specified
579                if (!isFrequencySpecified && isTimeUnitSpecified) {
580                    throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when "
581                            + "frequency is specified. Either specify frequency also or else remove the time unit");
582                } else if (isFrequencySpecified) {
583                    // Frequency value is specified
584                    if (isTimeUnitSpecified) {
585                        if (timeUnit.equalsIgnoreCase("months")) {
586                            timeUnit = "MONTH";
587                        } else if (timeUnit.equalsIgnoreCase("days")) {
588                            timeUnit = "DAY";
589                        } else if (timeUnit.equalsIgnoreCase("hours")) {
590                            // When job details are persisted to database, frequency in hours are converted to minutes.
591                            // This conversion is to conform with that.
592                            frequency = Integer.parseInt(frequency) * 60 + "";
593                            timeUnit = "MINUTE";
594                        } else if (timeUnit.equalsIgnoreCase("minutes")) {
595                            timeUnit = "MINUTE";
596                        }
597                    }
598                    // Adding the frequency and time unit filters to the filter map
599                    List<String> list = new ArrayList<String>();
600                    list.add(timeUnit);
601                    map.put("unit", list);
602                    list = new ArrayList<String>();
603                    list.add(frequency);
604                    map.put("frequency", list);
605                }
606            }
607            return map;
608        }
609    }