001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.IOException;
021    import java.io.Writer;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.StringTokenizer;
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.oozie.client.CoordinatorAction;
033    import org.apache.oozie.client.CoordinatorJob;
034    import org.apache.oozie.client.OozieClient;
035    import org.apache.oozie.client.WorkflowJob;
036    import org.apache.oozie.client.rest.RestConstants;
037    import org.apache.oozie.command.CommandException;
038    import org.apache.oozie.command.coord.CoordActionInfoXCommand;
039    import org.apache.oozie.util.CoordActionsInDateRange;
040    import org.apache.oozie.command.coord.CoordChangeXCommand;
041    import org.apache.oozie.command.coord.CoordJobXCommand;
042    import org.apache.oozie.command.coord.CoordJobsXCommand;
043    import org.apache.oozie.command.coord.CoordKillXCommand;
044    import org.apache.oozie.command.coord.CoordRerunXCommand;
045    import org.apache.oozie.command.coord.CoordResumeXCommand;
046    import org.apache.oozie.command.coord.CoordSubmitXCommand;
047    import org.apache.oozie.command.coord.CoordSuspendXCommand;
048    import org.apache.oozie.service.DagXLogInfoService;
049    import org.apache.oozie.service.Services;
050    import org.apache.oozie.service.XLogService;
051    import org.apache.oozie.util.ParamChecker;
052    import org.apache.oozie.util.XLog;
053    import org.apache.oozie.util.XLogStreamer;
054    
055    public class CoordinatorEngine extends BaseEngine {
056        private static XLog LOG = XLog.getLog(CoordinatorEngine.class);
057    
058        /**
059         * Create a system Coordinator engine, with no user and no group.
060         */
061        public CoordinatorEngine() {
062            if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
063                LOG.debug("Oozie CoordinatorEngine is not using XCommands.");
064            }
065            else {
066                LOG.debug("Oozie CoordinatorEngine is using XCommands.");
067            }
068        }
069    
070        /**
071         * Create a Coordinator engine to perform operations on behave of a user.
072         *
073         * @param user user name.
074         * @param authToken the authentication token.
075         */
076        public CoordinatorEngine(String user, String authToken) {
077            this();
078            this.user = ParamChecker.notEmpty(user, "user");
079            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
080        }
081    
082        /*
083         * (non-Javadoc)
084         *
085         * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
086         */
087        @Override
088        public String getDefinition(String jobId) throws BaseEngineException {
089            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
090            return job.getOrigJobXml();
091        }
092    
093        /**
094         * @param jobId
095         * @return CoordinatorJobBean
096         * @throws BaseEngineException
097         */
098        private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException {
099            try {
100                            return new CoordJobXCommand(jobId).call();
101            }
102            catch (CommandException ex) {
103                throw new BaseEngineException(ex);
104            }
105        }
106    
107        /**
108         * @param actionId
109         * @return CoordinatorActionBean
110         * @throws BaseEngineException
111         */
112        public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException {
113            try {
114                            return new CoordActionInfoXCommand(actionId).call();
115            }
116            catch (CommandException ex) {
117                throw new BaseEngineException(ex);
118            }
119        }
120    
121        /*
122         * (non-Javadoc)
123         *
124         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
125         */
126        @Override
127        public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException {
128            try {
129                            return new CoordJobXCommand(jobId).call();
130            }
131            catch (CommandException ex) {
132                throw new BaseEngineException(ex);
133            }
134        }
135    
136        /*
137         * (non-Javadoc)
138         *
139         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int)
140         */
141        @Override
142        public CoordinatorJobBean getCoordJob(String jobId, String filter, int start, int length) throws BaseEngineException {
143            List<String> filterList = parseStatusFilter(filter);
144            try {
145                            return new CoordJobXCommand(jobId, filterList, start, length)
146                                            .call();
147            }
148            catch (CommandException ex) {
149                throw new BaseEngineException(ex);
150            }
151        }
152    
153        /*
154         * (non-Javadoc)
155         *
156         * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
157         */
158        @Override
159        public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException {
160            return null;
161        }
162    
163        /*
164         * (non-Javadoc)
165         *
166         * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
167         */
168        @Override
169        public void kill(String jobId) throws CoordinatorEngineException {
170            try {
171                            new CoordKillXCommand(jobId).call();
172                LOG.info("User " + user + " killed the Coordinator job " + jobId);
173            }
174            catch (CommandException e) {
175                throw new CoordinatorEngineException(e);
176            }
177        }
178    
179        /* (non-Javadoc)
180         * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
181         */
182        @Override
183        public void change(String jobId, String changeValue) throws CoordinatorEngineException {
184            try {
185                new CoordChangeXCommand(jobId, changeValue).call();
186                LOG.info("User " + user + " changed the Coordinator job " + jobId + " to " + changeValue);
187            }
188            catch (CommandException e) {
189                throw new CoordinatorEngineException(e);
190            }
191        }
192    
193        @Override
194        @Deprecated
195        public void reRun(String jobId, Configuration conf) throws BaseEngineException {
196            throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun"));
197        }
198    
199        /**
200         * Rerun coordinator actions for given rerunType
201         *
202         * @param jobId
203         * @param rerunType
204         * @param scope
205         * @param refresh
206         * @param noCleanup
207         * @throws BaseEngineException
208         */
209        public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup)
210                throws BaseEngineException {
211            try {
212                            return new CoordRerunXCommand(jobId, rerunType, scope, refresh,
213                                            noCleanup).call();
214            }
215            catch (CommandException ex) {
216                throw new BaseEngineException(ex);
217            }
218        }
219    
220        /*
221         * (non-Javadoc)
222         *
223         * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
224         */
225        @Override
226        public void resume(String jobId) throws CoordinatorEngineException {
227            try {
228                            new CoordResumeXCommand(jobId).call();
229            }
230            catch (CommandException e) {
231                throw new CoordinatorEngineException(e);
232            }
233        }
234    
235        @Override
236        @Deprecated
237        public void start(String jobId) throws BaseEngineException {
238            throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start"));
239        }
240    
241        /*
242         * (non-Javadoc)
243         *
244         * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String,
245         * java.io.Writer)
246         */
247        @Override
248        public void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException {
249            XLogStreamer.Filter filter = new XLogStreamer.Filter();
250            filter.setParameter(DagXLogInfoService.JOB, jobId);
251    
252            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
253            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
254        }
255    
256        /**
257         * Add list of actions to the filter based on conditions
258         *
259         * @param jobId Job Id
260         * @param logRetrievalScope Value for the retrieval type
261         * @param logRetrievalType Based on which filter criteria the log is retrieved
262         * @param writer writer to stream the log to
263         * @throws IOException
264         * @throws BaseEngineException
265         * @throws CommandException
266         */
267        public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer)
268                throws IOException, BaseEngineException, CommandException {
269            XLogStreamer.Filter filter = new XLogStreamer.Filter();
270            filter.setParameter(DagXLogInfoService.JOB, jobId);
271            if (logRetrievalScope != null && logRetrievalType != null) {
272                // if coordinator action logs are to be retrieved based on action id range
273                if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) {
274                    Set<String> actions = new HashSet<String>();
275                    String[] list = logRetrievalScope.split(",");
276                    for (String s : list) {
277                        s = s.trim();
278                        if (s.contains("-")) {
279                            String[] range = s.split("-");
280                            if (range.length != 2) {
281                                throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
282                                        + "'");
283                            }
284                            int start;
285                            int end;
286                            try {
287                                start = Integer.parseInt(range[0].trim());
288                            } catch (NumberFormatException ne) {
289                                throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer",
290                                        ne);
291                            }
292                            try {
293                                end = Integer.parseInt(range[1].trim());
294                            } catch (NumberFormatException ne) {
295                                throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer",
296                                        ne);
297                            }
298                            if (start > end) {
299                                throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
300                            }
301                            for (int i = start; i <= end; i++) {
302                                actions.add(jobId + "@" + i);
303                            }
304                        }
305                        else {
306                            try {
307                                Integer.parseInt(s);
308                            }
309                            catch (NumberFormatException ne) {
310                                throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
311                                        + "'. Integer only.");
312                            }
313                            actions.add(jobId + "@" + s);
314                        }
315                    }
316    
317                    Iterator<String> actionsIterator = actions.iterator();
318                    StringBuilder orSeparatedActions = new StringBuilder("");
319                    boolean orRequired = false;
320                    while (actionsIterator.hasNext()) {
321                        if (orRequired) {
322                            orSeparatedActions.append("|");
323                        }
324                        orSeparatedActions.append(actionsIterator.next().toString());
325                        orRequired = true;
326                    }
327                    if (actions.size() > 1 && orRequired) {
328                        orSeparatedActions.insert(0, "(");
329                        orSeparatedActions.append(")");
330                    }
331                    filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
332                }
333                // if coordinator action logs are to be retrieved based on date range
334                // this block gets the corresponding list of coordinator actions to be used by the log filter
335                if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) {
336                    List<String> coordActionIdList = null;
337                    try {
338                        coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope);
339                    }
340                    catch (XException xe) {
341                        throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe);
342                    }
343                    StringBuilder orSeparatedActions = new StringBuilder("");
344                    boolean orRequired = false;
345                    for (String coordActionId : coordActionIdList) {
346                        if (orRequired) {
347                            orSeparatedActions.append("|");
348                        }
349                        orSeparatedActions.append(coordActionId);
350                        orRequired = true;
351                    }
352                    if (coordActionIdList.size() > 1 && orRequired) {
353                        orSeparatedActions.insert(0, "(");
354                        orSeparatedActions.append(")");
355                    }
356                    filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
357                }
358            }
359            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
360            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
361        }
362    
363        /*
364         * (non-Javadoc)
365         *
366         * @see
367         * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration
368         * , boolean)
369         */
370        @Override
371        public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException {
372            try {
373                            CoordSubmitXCommand submit = new CoordSubmitXCommand(conf,
374                                            getAuthToken());
375                            return submit.call();
376            }
377            catch (CommandException ex) {
378                throw new CoordinatorEngineException(ex);
379            }
380        }
381    
382        /*
383         * (non-Javadoc)
384         *
385         * @see
386         * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
387         */
388        @Override
389        public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException {
390            try {
391                            CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf,
392                                            getAuthToken());
393                            return submit.call();
394            }
395            catch (CommandException ex) {
396                throw new CoordinatorEngineException(ex);
397            }
398        }
399    
400        /*
401         * (non-Javadoc)
402         *
403         * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
404         */
405        @Override
406        public void suspend(String jobId) throws CoordinatorEngineException {
407            try {
408                            new CoordSuspendXCommand(jobId).call();
409            }
410            catch (CommandException e) {
411                throw new CoordinatorEngineException(e);
412            }
413    
414        }
415    
416        /*
417         * (non-Javadoc)
418         *
419         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
420         */
421        @Override
422        public WorkflowJob getJob(String jobId) throws BaseEngineException {
423            throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
424        }
425    
426        /*
427         * (non-Javadoc)
428         *
429         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
430         */
431        @Override
432        public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException {
433            throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
434        }
435    
436        private static final Set<String> FILTER_NAMES = new HashSet<String>();
437    
438        static {
439            FILTER_NAMES.add(OozieClient.FILTER_USER);
440            FILTER_NAMES.add(OozieClient.FILTER_NAME);
441            FILTER_NAMES.add(OozieClient.FILTER_GROUP);
442            FILTER_NAMES.add(OozieClient.FILTER_STATUS);
443            FILTER_NAMES.add(OozieClient.FILTER_ID);
444            FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
445            FILTER_NAMES.add(OozieClient.FILTER_UNIT);
446        }
447    
448        /**
449         * @param filter
450         * @param start
451         * @param len
452         * @return CoordinatorJobInfo
453         * @throws CoordinatorEngineException
454         */
455        public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException {
456            Map<String, List<String>> filterList = parseFilter(filter);
457    
458            try {
459                            return new CoordJobsXCommand(filterList, start, len).call();
460            }
461            catch (CommandException ex) {
462                throw new CoordinatorEngineException(ex);
463            }
464        }
465    
466    
467        // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values
468        private List<String> parseStatusFilter(String filter) throws CoordinatorEngineException {
469            List<String> filterList = new ArrayList<String>();
470            if (filter != null) {
471                //split name;value pairs
472                StringTokenizer st = new StringTokenizer(filter, ";");
473                while (st.hasMoreTokens()) {
474                    String token = st.nextToken();
475                    if (token.contains("=")) {
476                        String[] pair = token.split("=");
477                        if (pair.length != 2) {
478                            throw new CoordinatorEngineException(ErrorCode.E0421, token,
479                                    "elements must be name=value pairs");
480                        }
481                        if (pair[0].equalsIgnoreCase("status")) {
482                            String statusValue = pair[1];
483                            try {
484                                CoordinatorAction.Status.valueOf(statusValue);
485                            } catch (IllegalArgumentException ex) {
486                                StringBuilder validStatusList = new StringBuilder();
487                                for (CoordinatorAction.Status status: CoordinatorAction.Status.values()){
488                                    validStatusList.append(status.toString()+" ");
489                                }
490                                // Check for incorrect status value
491                                throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
492                                        "invalid status value [{0}]." + " Valid status values are: [{1}]", statusValue, validStatusList));
493                            }
494                            filterList.add(statusValue);
495                        } else {
496                            // Check for incorrect filter option
497                            throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
498                                    "invalid filter [{0}]." + " The only valid filter is \"status\"", pair[0]));
499                        }
500                    } else {
501                        throw new CoordinatorEngineException(ErrorCode.E0421, token,
502                                 "elements must be name=value pairs");
503                    }
504                }
505            }
506            return filterList;
507        }
508    
509        /**
510         * @param filter
511         * @return Map<String, List<String>>
512         * @throws CoordinatorEngineException
513         */
514        private Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException {
515            Map<String, List<String>> map = new HashMap<String, List<String>>();
516            boolean isTimeUnitSpecified = false;
517            String timeUnit = "MINUTE";
518            boolean isFrequencySpecified = false;
519            String frequency = "";
520            if (filter != null) {
521                StringTokenizer st = new StringTokenizer(filter, ";");
522                while (st.hasMoreTokens()) {
523                    String token = st.nextToken();
524                    if (token.contains("=")) {
525                        String[] pair = token.split("=");
526                        if (pair.length != 2) {
527                            throw new CoordinatorEngineException(ErrorCode.E0420, filter,
528                                    "elements must be name=value pairs");
529                        }
530                        if (!FILTER_NAMES.contains(pair[0].toLowerCase())) {
531                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
532                                    pair[0]));
533                        }
534                        if (pair[0].equalsIgnoreCase("frequency")) {
535                            isFrequencySpecified = true;
536                            try {
537                                frequency = (int) Float.parseFloat(pair[1]) + "";
538                                continue;
539                            }
540                            catch (NumberFormatException NANException) {
541                                throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
542                                        "invalid value [{0}] for frequency. A numerical value is expected", pair[1]));
543                            }
544                        }
545                        if (pair[0].equalsIgnoreCase("unit")) {
546                            isTimeUnitSpecified = true;
547                            timeUnit = pair[1];
548                            if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days")
549                                    && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) {
550                                throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
551                                        "invalid value [{0}] for time unit. "
552                                                + "Valid value is one of months, days, hours or minutes", pair[1]));
553                            }
554                            continue;
555                        }
556                        if (pair[0].equals("status")) {
557                            try {
558                                CoordinatorJob.Status.valueOf(pair[1]);
559                            }
560                            catch (IllegalArgumentException ex) {
561                                throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
562                                        "invalid status [{0}]", pair[1]));
563                            }
564                        }
565                        List<String> list = map.get(pair[0]);
566                        if (list == null) {
567                            list = new ArrayList<String>();
568                            map.put(pair[0], list);
569                        }
570                        list.add(pair[1]);
571                    } else {
572                        throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
573                    }
574                }
575                // Unit is specified and frequency is not specified
576                if (!isFrequencySpecified && isTimeUnitSpecified) {
577                    throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when "
578                            + "frequency is specified. Either specify frequency also or else remove the time unit");
579                } else if (isFrequencySpecified) {
580                    // Frequency value is specified
581                    if (isTimeUnitSpecified) {
582                        if (timeUnit.equalsIgnoreCase("months")) {
583                            timeUnit = "MONTH";
584                        } else if (timeUnit.equalsIgnoreCase("days")) {
585                            timeUnit = "DAY";
586                        } else if (timeUnit.equalsIgnoreCase("hours")) {
587                            // When job details are persisted to database, frequency in hours are converted to minutes.
588                            // This conversion is to conform with that.
589                            frequency = Integer.parseInt(frequency) * 60 + "";
590                            timeUnit = "MINUTE";
591                        } else if (timeUnit.equalsIgnoreCase("minutes")) {
592                            timeUnit = "MINUTE";
593                        }
594                    }
595                    // Adding the frequency and time unit filters to the filter map
596                    List<String> list = new ArrayList<String>();
597                    list.add(timeUnit);
598                    map.put("unit", list);
599                    list = new ArrayList<String>();
600                    list.add(frequency);
601                    map.put("frequency", list);
602                }
603            }
604            return map;
605        }
606    }