001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.HashSet;
025    import java.util.List;
026    import java.util.Set;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.FileSystem;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.oozie.CoordinatorActionBean;
032    import org.apache.oozie.CoordinatorActionInfo;
033    import org.apache.oozie.CoordinatorJobBean;
034    import org.apache.oozie.ErrorCode;
035    import org.apache.oozie.XException;
036    import org.apache.oozie.client.CoordinatorAction;
037    import org.apache.oozie.client.CoordinatorJob;
038    import org.apache.oozie.client.SLAEvent.SlaAppType;
039    import org.apache.oozie.client.rest.RestConstants;
040    import org.apache.oozie.command.CommandException;
041    import org.apache.oozie.coord.CoordELFunctions;
042    import org.apache.oozie.service.HadoopAccessorService;
043    import org.apache.oozie.service.Services;
044    import org.apache.oozie.store.CoordinatorStore;
045    import org.apache.oozie.store.StoreException;
046    import org.apache.oozie.util.DateUtils;
047    import org.apache.oozie.util.ParamChecker;
048    import org.apache.oozie.util.XConfiguration;
049    import org.apache.oozie.util.XLog;
050    import org.apache.oozie.util.XmlUtils;
051    import org.apache.oozie.util.db.SLADbOperations;
052    import org.jdom.Element;
053    import org.jdom.JDOMException;
054    
055    public class CoordRerunCommand extends CoordinatorCommand<CoordinatorActionInfo> {
056    
057        private String jobId;
058        private String rerunType;
059        private String scope;
060        private boolean refresh;
061        private boolean noCleanup;
062        private final XLog log = XLog.getLog(getClass());
063    
064        public CoordRerunCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
065            super("coord_rerun", "coord_rerun", 1, XLog.STD);
066            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
067            this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType");
068            this.scope = ParamChecker.notEmpty(scope, "scope");
069            this.refresh = refresh;
070            this.noCleanup = noCleanup;
071        }
072    
073        @Override
074        protected CoordinatorActionInfo call(CoordinatorStore store) throws StoreException, CommandException {
075            try {
076                CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, false);
077                CoordinatorActionInfo coordInfo = null;
078                setLogInfo(coordJob);
079                if (coordJob.getStatus() != CoordinatorJob.Status.KILLED
080                        && coordJob.getStatus() != CoordinatorJob.Status.FAILED) {
081                    incrJobCounter(1);
082    
083                    List<CoordinatorActionBean> coordActions;
084                    if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) {
085                        coordActions = getCoordActionsFromDates(jobId, scope, store);
086                    }
087                    else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) {
088                        coordActions = getCoordActionsFromIds(jobId, scope, store);
089                    }
090                    else {
091                        throw new CommandException(ErrorCode.E1018, "date or action expected.");
092                    }
093                    if (checkAllActionsRunnable(coordActions)) {
094                        Configuration conf = new XConfiguration(new StringReader(coordJob.getConf()));
095                        for (CoordinatorActionBean coordAction : coordActions) {
096                            String actionXml = coordAction.getActionXml();
097                            if (!noCleanup) {
098                                Element eAction = XmlUtils.parseXml(actionXml);
099                                cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup(), conf);
100                            }
101                            if (refresh) {
102                                refreshAction(coordJob, coordAction, store);
103                            }
104                            updateAction(coordJob, coordAction, actionXml, store);
105    
106                            // TODO: time 100s should be configurable
107                            queueCallable(new CoordActionNotification(coordAction), 100);
108                            queueCallable(new CoordActionInputCheckCommand(coordAction.getId()), 100);
109                        }
110                    }
111                    else {
112                        throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!");
113                    }
114                    coordInfo = new CoordinatorActionInfo(coordActions);
115                }
116                else {
117                    log.info("CoordRerunCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid="
118                            + jobId);
119                    throw new CommandException(ErrorCode.E1018,
120                            "coordinator job is killed or failed so all actions are not eligible to rerun!");
121                }
122                return coordInfo;
123            }
124            catch (XException xex) {
125                throw new CommandException(xex);
126            }
127            catch (JDOMException jex) {
128                throw new CommandException(ErrorCode.E0700, jex);
129            }
130            catch (Exception ex) {
131                throw new CommandException(ErrorCode.E1018, ex);
132            }
133        }
134    
135        /**
136         * Get the list of actions for given id ranges
137         *
138         * @param jobId
139         * @param scope
140         * @param store
141         * @return the list of all actions to rerun
142         * @throws CommandException
143         * @throws StoreException
144         */
145        private List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope, CoordinatorStore store)
146                throws CommandException, StoreException {
147            ParamChecker.notEmpty(jobId, "jobId");
148            ParamChecker.notEmpty(scope, "scope");
149    
150            Set<String> actions = new HashSet<String>();
151            String[] list = scope.split(",");
152            for (String s : list) {
153                s = s.trim();
154                if (s.contains("-")) {
155                    String[] range = s.split("-");
156                    if (range.length != 2) {
157                        throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
158                    }
159                    int start;
160                    int end;
161                    try {
162                        start = Integer.parseInt(range[0].trim());
163                        end = Integer.parseInt(range[1].trim());
164                        if (start > end) {
165                            throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
166                        }
167                    }
168                    catch (NumberFormatException ne) {
169                        throw new CommandException(ErrorCode.E0302, ne);
170                    }
171                    for (int i = start; i <= end; i++) {
172                        actions.add(jobId + "@" + i);
173                    }
174                }
175                else {
176                    try {
177                        Integer.parseInt(s);
178                    }
179                    catch (NumberFormatException ne) {
180                        throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
181                                + "'. Integer only.");
182                    }
183                    actions.add(jobId + "@" + s);
184                }
185            }
186    
187            List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
188            for (String id : actions) {
189                CoordinatorActionBean coordAction = store.getCoordinatorAction(id, false);
190                coordActions.add(coordAction);
191                log.debug("Rerun coordinator for actionId='" + id + "'");
192            }
193            return coordActions;
194        }
195    
196        /**
197         * Get the list of actions for given date ranges
198         *
199         * @param jobId
200         * @param scope
201         * @param store
202         * @return the list of dates to rerun
203         * @throws CommandException
204         * @throws StoreException
205         */
206        private List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, CoordinatorStore store)
207                throws CommandException, StoreException {
208            ParamChecker.notEmpty(jobId, "jobId");
209            ParamChecker.notEmpty(scope, "scope");
210    
211            Set<CoordinatorActionBean> actionSet = new HashSet<CoordinatorActionBean>();
212            String[] list = scope.split(",");
213            for (String s : list) {
214                s = s.trim();
215                if (s.contains("::")) {
216                    String[] dateRange = s.split("::");
217                    if (dateRange.length != 2) {
218                        throw new CommandException(ErrorCode.E0302, "format is wrong for date's range '" + s + "'");
219                    }
220                    Date start;
221                    Date end;
222                    try {
223                        start = DateUtils.parseDateUTC(dateRange[0].trim());
224                        end = DateUtils.parseDateUTC(dateRange[1].trim());
225                        if (start.after(end)) {
226                            throw new CommandException(ErrorCode.E0302, "start date is older than end date: '" + s + "'");
227                        }
228                    }
229                    catch (Exception e) {
230                        throw new CommandException(ErrorCode.E0302, e);
231                    }
232    
233                    List<CoordinatorActionBean> listOfActions = getActionIdsFromDateRange(jobId, start, end, store);
234                    actionSet.addAll(listOfActions);
235                }
236                else {
237                    Date date;
238                    try {
239                        date = DateUtils.parseDateUTC(s.trim());
240                    }
241                    catch (Exception e) {
242                        throw new CommandException(ErrorCode.E0302, e);
243                    }
244    
245                    CoordinatorActionBean coordAction = store.getCoordActionForNominalTime(jobId, date);
246                    actionSet.add(coordAction);
247                }
248            }
249    
250            List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
251            for (CoordinatorActionBean coordAction : actionSet) {
252                coordActions.add(coordAction);
253                log.debug("Rerun coordinator for actionId='" + coordAction.getId() + "'");
254            }
255            return coordActions;
256        }
257    
258        private List<CoordinatorActionBean> getActionIdsFromDateRange(String jobId, Date start, Date end,
259                CoordinatorStore store)
260                throws StoreException {
261            List<CoordinatorActionBean> list = store.getCoordActionsForDates(jobId, start, end);
262            return list;
263        }
264    
265        /**
266         * Check if all given actions are eligible to rerun.
267         *
268         * @param actions list of CoordinatorActionBean
269         * @return true if all actions are eligible to rerun
270         */
271        private boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) {
272            for (CoordinatorActionBean coordAction : coordActions) {
273                if (!coordAction.isTerminalStatus()) {
274                    return false;
275                }
276            }
277            return true;
278        }
279    
280        /**
281         * Cleanup output-events directories
282         *
283         * @param eAction
284         * @param workflow
285         * @param action
286         */
287        @SuppressWarnings("unchecked")
288        private void cleanupOutputEvents(Element eAction, String user, String group, Configuration conf) {
289            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
290            if (outputList != null) {
291                for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
292                    if (data.getChild("uris", data.getNamespace()) != null) {
293                        String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
294                        if (uris != null) {
295                            String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
296                            for (String uri : uriArr) {
297                                Path path = new Path(uri);
298                                try {
299                                    FileSystem fs = Services.get().get(HadoopAccessorService.class).
300                                            createFileSystem(user, group, path.toUri(), conf);
301                                    if (fs.exists(path)) {
302                                        if (!fs.delete(path, true)) {
303                                            throw new IOException();
304                                        }
305                                    }
306                                    log.debug("Cleanup the output dir " + path);
307                                }
308                                catch (Exception ex) {
309                                    log.warn("Failed to cleanup the output dir " + uri, ex);
310                                }
311                            }
312                        }
313    
314                    }
315                }
316            }
317            else {
318                log.info("No output-events defined in coordinator xml. Therefore nothing to cleanup");
319            }
320        }
321    
322        /**
323         * Refresh an Action
324         *
325         * @param coordJob
326         * @param coordAction
327         * @param store
328         * @throws Exception
329         */
330        private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, CoordinatorStore store)
331                throws Exception {
332            Configuration jobConf = null;
333            try {
334                jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
335            }
336            catch (IOException ioe) {
337                log.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
338                throw new CommandException(ErrorCode.E1005, ioe);
339            }
340            String jobXml = coordJob.getJobXml();
341            Element eJob = XmlUtils.parseXml(jobXml);
342            Date actualTime = new Date();
343            String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction
344                    .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction);
345            log.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml="
346                    + XmlUtils.prettyPrint(actionXml).toString());
347            coordAction.setActionXml(actionXml);
348        }
349    
350        /**
351         * Update an Action into database table
352         *
353         * @param coordJob
354         * @param coordAction
355         * @param actionXml
356         * @param store
357         * @throws Exception
358         */
359        private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml,
360                CoordinatorStore store) throws Exception {
361            log.debug("updateAction for actionId=" + coordAction.getId());
362            coordAction.setStatus(CoordinatorAction.Status.WAITING);
363            coordAction.setExternalId("");
364            coordAction.setExternalStatus("");
365            coordAction.setRerunTime(new Date());
366            store.updateCoordinatorAction(coordAction);
367            writeActionRegistration(coordAction.getActionXml(), coordAction, store, coordJob.getUser(), coordJob.getGroup());
368        }
369    
370        /**
371         * Create SLA RegistrationEvent
372         *
373         * @param actionXml
374         * @param actionBean
375         * @param store
376         * @param user
377         * @param group
378         * @throws Exception
379         */
380        private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store,
381                String user, String group)
382                throws Exception {
383            Element eAction = XmlUtils.parseXml(actionXml);
384            Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
385            SLADbOperations.writeSlaRegistrationEvent(eSla, store, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user,
386                    group);
387        }
388    
389        @Override
390        protected CoordinatorActionInfo execute(CoordinatorStore store) throws StoreException, CommandException {
391            log.info("STARTED CoordRerunCommand for jobId=" + jobId + ", scope=" + scope);
392            CoordinatorActionInfo coordInfo = null;
393            try {
394                if (lock(jobId)) {
395                    coordInfo = call(store);
396                }
397                else {
398                    queueCallable(new CoordResumeCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL);
399                    log.warn("CoordRerunCommand lock was not acquired - " + " failed " + jobId + ". Requeing the same.");
400                }
401            }
402            catch (InterruptedException e) {
403                queueCallable(new CoordResumeCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL);
404                log.warn("CoordRerunCommand lock acquiring failed " + " with exception " + e.getMessage() + " for job id "
405                        + jobId + ". Requeing the same.");
406            }
407            finally {
408                log.info("ENDED CoordRerunCommand for jobId=" + jobId + ", scope=" + scope);
409            }
410            return coordInfo;
411        }
412    
413    }