001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.IOException;
021    import java.io.Writer;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.StringTokenizer;
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.oozie.client.CoordinatorAction;
033    import org.apache.oozie.client.CoordinatorJob;
034    import org.apache.oozie.client.OozieClient;
035    import org.apache.oozie.client.WorkflowJob;
036    import org.apache.oozie.client.rest.RestConstants;
037    import org.apache.oozie.command.CommandException;
038    import org.apache.oozie.command.coord.CoordActionInfoXCommand;
039    import org.apache.oozie.util.CoordActionsInDateRange;
040    import org.apache.oozie.command.coord.CoordChangeXCommand;
041    import org.apache.oozie.command.coord.CoordJobXCommand;
042    import org.apache.oozie.command.coord.CoordJobsXCommand;
043    import org.apache.oozie.command.coord.CoordKillXCommand;
044    import org.apache.oozie.command.coord.CoordRerunXCommand;
045    import org.apache.oozie.command.coord.CoordResumeXCommand;
046    import org.apache.oozie.command.coord.CoordSubmitXCommand;
047    import org.apache.oozie.command.coord.CoordSuspendXCommand;
048    import org.apache.oozie.service.DagXLogInfoService;
049    import org.apache.oozie.service.Services;
050    import org.apache.oozie.service.XLogService;
051    import org.apache.oozie.util.ParamChecker;
052    import org.apache.oozie.util.XLog;
053    import org.apache.oozie.util.XLogStreamer;
054    
055    public class CoordinatorEngine extends BaseEngine {
056        private static XLog LOG = XLog.getLog(CoordinatorEngine.class);
057    
058        /**
059         * Create a system Coordinator engine, with no user and no group.
060         */
061        public CoordinatorEngine() {
062            if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
063                LOG.debug("Oozie CoordinatorEngine is not using XCommands.");
064            }
065            else {
066                LOG.debug("Oozie CoordinatorEngine is using XCommands.");
067            }
068        }
069    
070        /**
071         * Create a Coordinator engine to perform operations on behave of a user.
072         *
073         * @param user user name.
074         * @param authToken the authentication token.
075         */
076        public CoordinatorEngine(String user, String authToken) {
077            this();
078            this.user = ParamChecker.notEmpty(user, "user");
079            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
080        }
081    
082        /*
083         * (non-Javadoc)
084         *
085         * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
086         */
087        @Override
088        public String getDefinition(String jobId) throws BaseEngineException {
089            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
090            return job.getOrigJobXml();
091        }
092    
093        /**
094         * @param jobId
095         * @return CoordinatorJobBean
096         * @throws BaseEngineException
097         */
098        private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException {
099            try {
100                            return new CoordJobXCommand(jobId).call();
101            }
102            catch (CommandException ex) {
103                throw new BaseEngineException(ex);
104            }
105        }
106    
107        /**
108         * @param actionId
109         * @return CoordinatorActionBean
110         * @throws BaseEngineException
111         */
112        public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException {
113            try {
114                            return new CoordActionInfoXCommand(actionId).call();
115            }
116            catch (CommandException ex) {
117                throw new BaseEngineException(ex);
118            }
119        }
120    
121        /*
122         * (non-Javadoc)
123         *
124         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
125         */
126        @Override
127        public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException {
128            try {
129                            return new CoordJobXCommand(jobId).call();
130            }
131            catch (CommandException ex) {
132                throw new BaseEngineException(ex);
133            }
134        }
135    
136        /*
137         * (non-Javadoc)
138         *
139         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int)
140         */
141        @Override
142        public CoordinatorJobBean getCoordJob(String jobId, String filter, int start, int length) throws BaseEngineException {
143            List<String> filterList = parseStatusFilter(filter);
144            try {
145                            return new CoordJobXCommand(jobId, filterList, start, length)
146                                            .call();
147            }
148            catch (CommandException ex) {
149                throw new BaseEngineException(ex);
150            }
151        }
152    
153        /*
154         * (non-Javadoc)
155         *
156         * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
157         */
158        @Override
159        public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException {
160            return null;
161        }
162    
163        /*
164         * (non-Javadoc)
165         *
166         * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
167         */
168        @Override
169        public void kill(String jobId) throws CoordinatorEngineException {
170            try {
171                            new CoordKillXCommand(jobId).call();
172                LOG.info("User " + user + " killed the Coordinator job " + jobId);
173            }
174            catch (CommandException e) {
175                throw new CoordinatorEngineException(e);
176            }
177        }
178    
179        /* (non-Javadoc)
180         * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
181         */
182        @Override
183        public void change(String jobId, String changeValue) throws CoordinatorEngineException {
184            try {
185                new CoordChangeXCommand(jobId, changeValue).call();
186                LOG.info("User " + user + " changed the Coordinator job " + jobId + " to " + changeValue);
187            }
188            catch (CommandException e) {
189                throw new CoordinatorEngineException(e);
190            }
191        }
192    
193        @Override
194        @Deprecated
195        public void reRun(String jobId, Configuration conf) throws BaseEngineException {
196            throw new BaseEngineException(new XException(ErrorCode.E0301));
197        }
198    
199        /**
200         * Rerun coordinator actions for given rerunType
201         *
202         * @param jobId
203         * @param rerunType
204         * @param scope
205         * @param refresh
206         * @param noCleanup
207         * @throws BaseEngineException
208         */
209        public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup)
210                throws BaseEngineException {
211            try {
212                            return new CoordRerunXCommand(jobId, rerunType, scope, refresh,
213                                            noCleanup).call();
214            }
215            catch (CommandException ex) {
216                throw new BaseEngineException(ex);
217            }
218        }
219    
220        /*
221         * (non-Javadoc)
222         *
223         * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
224         */
225        @Override
226        public void resume(String jobId) throws CoordinatorEngineException {
227            try {
228                            new CoordResumeXCommand(jobId).call();
229            }
230            catch (CommandException e) {
231                throw new CoordinatorEngineException(e);
232            }
233        }
234    
235        @Override
236        @Deprecated
237        public void start(String jobId) throws BaseEngineException {
238            throw new BaseEngineException(new XException(ErrorCode.E0301));
239        }
240    
241        /*
242         * (non-Javadoc)
243         *
244         * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String,
245         * java.io.Writer)
246         */
247        @Override
248        public void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException {
249            XLogStreamer.Filter filter = new XLogStreamer.Filter();
250            filter.setParameter(DagXLogInfoService.JOB, jobId);
251    
252            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
253            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
254        }
255    
256        /**
257         * Add list of actions to the filter based on conditions
258         *
259         * @param jobId Job Id
260         * @param logRetrievalScope Value for the retrieval type
261         * @param logRetrievalType Based on which filter criteria the log is retrieved
262         * @param writer writer to stream the log to
263         * @throws IOException
264         * @throws BaseEngineException
265         * @throws CommandException
266         */
267        public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer)
268                throws IOException, BaseEngineException, CommandException {
269            XLogStreamer.Filter filter = new XLogStreamer.Filter();
270            filter.setParameter(DagXLogInfoService.JOB, jobId);
271            if (logRetrievalScope != null && logRetrievalType != null) {
272                // if coordinator action logs are to be retrieved based on action id range
273                if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) {
274                    Set<String> actions = new HashSet<String>();
275                    String[] list = logRetrievalScope.split(",");
276                    for (String s : list) {
277                        s = s.trim();
278                        if (s.contains("-")) {
279                            String[] range = s.split("-");
280                            if (range.length != 2) {
281                                throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
282                                        + "'");
283                            }
284                            int start;
285                            int end;
286                            try {
287                                start = Integer.parseInt(range[0].trim());
288                                end = Integer.parseInt(range[1].trim());
289                                if (start > end) {
290                                    throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
291                                            + "'");
292                                }
293                            }
294                            catch (NumberFormatException ne) {
295                                throw new CommandException(ErrorCode.E0302, ne);
296                            }
297                            for (int i = start; i <= end; i++) {
298                                actions.add(jobId + "@" + i);
299                            }
300                        }
301                        else {
302                            try {
303                                Integer.parseInt(s);
304                            }
305                            catch (NumberFormatException ne) {
306                                throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
307                                        + "'. Integer only.");
308                            }
309                            actions.add(jobId + "@" + s);
310                        }
311                    }
312    
313                    Iterator<String> actionsIterator = actions.iterator();
314                    StringBuilder orSeparatedActions = new StringBuilder("");
315                    boolean orRequired = false;
316                    while (actionsIterator.hasNext()) {
317                        if (orRequired) {
318                            orSeparatedActions.append("|");
319                        }
320                        orSeparatedActions.append(actionsIterator.next().toString());
321                        orRequired = true;
322                    }
323                    if (actions.size() > 1 && orRequired) {
324                        orSeparatedActions.insert(0, "(");
325                        orSeparatedActions.append(")");
326                    }
327                    filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
328                }
329                // if coordinator action logs are to be retrieved based on date range
330                // this block gets the corresponding list of coordinator actions to be used by the log filter
331                if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) {
332                    List<String> coordActionIdList = null;
333                    try {
334                        coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope);
335                    }
336                    catch (XException xe) {
337                        throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe);
338                    }
339                    StringBuilder orSeparatedActions = new StringBuilder("");
340                    boolean orRequired = false;
341                    for (String coordActionId : coordActionIdList) {
342                        if (orRequired) {
343                            orSeparatedActions.append("|");
344                        }
345                        orSeparatedActions.append(coordActionId);
346                        orRequired = true;
347                    }
348                    if (coordActionIdList.size() > 1 && orRequired) {
349                        orSeparatedActions.insert(0, "(");
350                        orSeparatedActions.append(")");
351                    }
352                    filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
353                }
354            }
355            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
356            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
357        }
358    
359        /*
360         * (non-Javadoc)
361         *
362         * @see
363         * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration
364         * , boolean)
365         */
366        @Override
367        public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException {
368            try {
369                            CoordSubmitXCommand submit = new CoordSubmitXCommand(conf,
370                                            getAuthToken());
371                            return submit.call();
372            }
373            catch (CommandException ex) {
374                throw new CoordinatorEngineException(ex);
375            }
376        }
377    
378        /*
379         * (non-Javadoc)
380         *
381         * @see
382         * org.apache.oozie.BaseEngine#dryrunSubmit(org.apache.hadoop.conf.Configuration
383         * , boolean)
384         */
385        @Override
386        public String dryrunSubmit(Configuration conf, boolean startJob) throws CoordinatorEngineException {
387            try {
388                            CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf,
389                                            getAuthToken());
390                            return submit.call();
391            }
392            catch (CommandException ex) {
393                throw new CoordinatorEngineException(ex);
394            }
395        }
396    
397        /*
398         * (non-Javadoc)
399         *
400         * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
401         */
402        @Override
403        public void suspend(String jobId) throws CoordinatorEngineException {
404            try {
405                            new CoordSuspendXCommand(jobId).call();
406            }
407            catch (CommandException e) {
408                throw new CoordinatorEngineException(e);
409            }
410    
411        }
412    
413        /*
414         * (non-Javadoc)
415         *
416         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
417         */
418        @Override
419        public WorkflowJob getJob(String jobId) throws BaseEngineException {
420            throw new BaseEngineException(new XException(ErrorCode.E0301));
421        }
422    
423        /*
424         * (non-Javadoc)
425         *
426         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
427         */
428        @Override
429        public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException {
430            throw new BaseEngineException(new XException(ErrorCode.E0301));
431        }
432    
433        private static final Set<String> FILTER_NAMES = new HashSet<String>();
434    
435        static {
436            FILTER_NAMES.add(OozieClient.FILTER_USER);
437            FILTER_NAMES.add(OozieClient.FILTER_NAME);
438            FILTER_NAMES.add(OozieClient.FILTER_GROUP);
439            FILTER_NAMES.add(OozieClient.FILTER_STATUS);
440            FILTER_NAMES.add(OozieClient.FILTER_ID);
441            FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
442            FILTER_NAMES.add(OozieClient.FILTER_UNIT);
443        }
444    
445        /**
446         * @param filter
447         * @param start
448         * @param len
449         * @return CoordinatorJobInfo
450         * @throws CoordinatorEngineException
451         */
452        public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException {
453            Map<String, List<String>> filterList = parseFilter(filter);
454    
455            try {
456                            return new CoordJobsXCommand(filterList, start, len).call();
457            }
458            catch (CommandException ex) {
459                throw new CoordinatorEngineException(ex);
460            }
461        }
462    
463    
464        // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values
465        private List<String> parseStatusFilter(String filter) throws CoordinatorEngineException {
466            List<String> filterList = new ArrayList<String>();
467            if (filter != null) {
468                //split name;value pairs
469                StringTokenizer st = new StringTokenizer(filter, ";");
470                while (st.hasMoreTokens()) {
471                    String token = st.nextToken();
472                    if (token.contains("=")) {
473                        String[] pair = token.split("=");
474                        if (pair.length != 2) {
475                            throw new CoordinatorEngineException(ErrorCode.E0421, token,
476                                    "elements must be name=value pairs");
477                        }
478                        if (pair[0].equalsIgnoreCase("status")) {
479                            String statusValue = pair[1];
480                            try {
481                                CoordinatorAction.Status.valueOf(statusValue);
482                            } catch (IllegalArgumentException ex) {
483                                StringBuilder validStatusList = new StringBuilder();
484                                for (CoordinatorAction.Status status: CoordinatorAction.Status.values()){
485                                    validStatusList.append(status.toString()+" ");
486                                }
487                                // Check for incorrect status value
488                                throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
489                                        "invalid status value [{0}]." + " Valid status values are: [{1}]", statusValue, validStatusList));
490                            }
491                            filterList.add(statusValue);
492                        } else {
493                            // Check for incorrect filter option
494                            throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
495                                    "invalid filter [{0}]." + " The only valid filter is \"status\"", pair[0]));
496                        }
497                    } else {
498                        throw new CoordinatorEngineException(ErrorCode.E0421, token,
499                                 "elements must be name=value pairs");
500                    }
501                }
502            }
503            return filterList;
504        }
505    
506        /**
507         * @param filter
508         * @return Map<String, List<String>>
509         * @throws CoordinatorEngineException
510         */
511        private Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException {
512            Map<String, List<String>> map = new HashMap<String, List<String>>();
513            boolean isTimeUnitSpecified = false;
514            String timeUnit = "MINUTE";
515            boolean isFrequencySpecified = false;
516            String frequency = "";
517            if (filter != null) {
518                StringTokenizer st = new StringTokenizer(filter, ";");
519                while (st.hasMoreTokens()) {
520                    String token = st.nextToken();
521                    if (token.contains("=")) {
522                        String[] pair = token.split("=");
523                        if (pair.length != 2) {
524                            throw new CoordinatorEngineException(ErrorCode.E0420, filter,
525                                    "elements must be name=value pairs");
526                        }
527                        if (!FILTER_NAMES.contains(pair[0].toLowerCase())) {
528                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
529                                    pair[0]));
530                        }
531                        if (pair[0].equalsIgnoreCase("frequency")) {
532                            isFrequencySpecified = true;
533                            try {
534                                frequency = (int) Float.parseFloat(pair[1]) + "";
535                                continue;
536                            }
537                            catch (NumberFormatException NANException) {
538                                throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
539                                        "invalid value [{0}] for frequency. A numerical value is expected", pair[1]));
540                            }
541                        }
542                        if (pair[0].equalsIgnoreCase("unit")) {
543                            isTimeUnitSpecified = true;
544                            timeUnit = pair[1];
545                            if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days")
546                                    && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) {
547                                throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
548                                        "invalid value [{0}] for time unit. "
549                                                + "Valid value is one of months, days, hours or minutes", pair[1]));
550                            }
551                            continue;
552                        }
553                        if (pair[0].equals("status")) {
554                            try {
555                                CoordinatorJob.Status.valueOf(pair[1]);
556                            }
557                            catch (IllegalArgumentException ex) {
558                                throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
559                                        "invalid status [{0}]", pair[1]));
560                            }
561                        }
562                        List<String> list = map.get(pair[0]);
563                        if (list == null) {
564                            list = new ArrayList<String>();
565                            map.put(pair[0], list);
566                        }
567                        list.add(pair[1]);
568                    } else {
569                        throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
570                    }
571                }
572                // Unit is specified and frequency is not specified
573                if (!isFrequencySpecified && isTimeUnitSpecified) {
574                    throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when "
575                            + "frequency is specified. Either specify frequency also or else remove the time unit");
576                } else if (isFrequencySpecified) {
577                    // Frequency value is specified
578                    if (isTimeUnitSpecified) {
579                        if (timeUnit.equalsIgnoreCase("months")) {
580                            timeUnit = "MONTH";
581                        } else if (timeUnit.equalsIgnoreCase("days")) {
582                            timeUnit = "DAY";
583                        } else if (timeUnit.equalsIgnoreCase("hours")) {
584                            // When job details are persisted to database, frequency in hours are converted to minutes.
585                            // This conversion is to conform with that.
586                            frequency = Integer.parseInt(frequency) * 60 + "";
587                            timeUnit = "MINUTE";
588                        } else if (timeUnit.equalsIgnoreCase("minutes")) {
589                            timeUnit = "MINUTE";
590                        }
591                    }
592                    // Adding the frequency and time unit filters to the filter map
593                    List<String> list = new ArrayList<String>();
594                    list.add(timeUnit);
595                    map.put("unit", list);
596                    list = new ArrayList<String>();
597                    list.add(frequency);
598                    map.put("frequency", list);
599                }
600            }
601            return map;
602        }
603    }