This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.HashSet;
025 import java.util.List;
026 import java.util.Set;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.FileSystem;
030 import org.apache.hadoop.fs.Path;
031 import org.apache.oozie.CoordinatorActionBean;
032 import org.apache.oozie.CoordinatorActionInfo;
033 import org.apache.oozie.CoordinatorJobBean;
034 import org.apache.oozie.ErrorCode;
035 import org.apache.oozie.XException;
036 import org.apache.oozie.client.CoordinatorAction;
037 import org.apache.oozie.client.CoordinatorJob;
038 import org.apache.oozie.client.SLAEvent.SlaAppType;
039 import org.apache.oozie.client.rest.RestConstants;
040 import org.apache.oozie.command.CommandException;
041 import org.apache.oozie.coord.CoordELFunctions;
042 import org.apache.oozie.service.HadoopAccessorService;
043 import org.apache.oozie.service.Services;
044 import org.apache.oozie.store.CoordinatorStore;
045 import org.apache.oozie.store.StoreException;
046 import org.apache.oozie.util.DateUtils;
047 import org.apache.oozie.util.ParamChecker;
048 import org.apache.oozie.util.XConfiguration;
049 import org.apache.oozie.util.XLog;
050 import org.apache.oozie.util.XmlUtils;
051 import org.apache.oozie.util.db.SLADbOperations;
052 import org.jdom.Element;
053 import org.jdom.JDOMException;
054
055 public class CoordRerunCommand extends CoordinatorCommand<CoordinatorActionInfo> {
056
057 private String jobId;
058 private String rerunType;
059 private String scope;
060 private boolean refresh;
061 private boolean noCleanup;
062 private final XLog log = XLog.getLog(getClass());
063
064 public CoordRerunCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
065 super("coord_rerun", "coord_rerun", 1, XLog.STD);
066 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
067 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType");
068 this.scope = ParamChecker.notEmpty(scope, "scope");
069 this.refresh = refresh;
070 this.noCleanup = noCleanup;
071 }
072
073 @Override
074 protected CoordinatorActionInfo call(CoordinatorStore store) throws StoreException, CommandException {
075 try {
076 CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, false);
077 CoordinatorActionInfo coordInfo = null;
078 setLogInfo(coordJob);
079 if (coordJob.getStatus() != CoordinatorJob.Status.KILLED
080 && coordJob.getStatus() != CoordinatorJob.Status.FAILED) {
081 incrJobCounter(1);
082
083 List<CoordinatorActionBean> coordActions;
084 if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) {
085 coordActions = getCoordActionsFromDates(jobId, scope, store);
086 }
087 else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) {
088 coordActions = getCoordActionsFromIds(jobId, scope, store);
089 }
090 else {
091 throw new CommandException(ErrorCode.E1018, "date or action expected.");
092 }
093 if (checkAllActionsRunnable(coordActions)) {
094 Configuration conf = new XConfiguration(new StringReader(coordJob.getConf()));
095 for (CoordinatorActionBean coordAction : coordActions) {
096 String actionXml = coordAction.getActionXml();
097 if (!noCleanup) {
098 Element eAction = XmlUtils.parseXml(actionXml);
099 cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup(), conf);
100 }
101 if (refresh) {
102 refreshAction(coordJob, coordAction, store);
103 }
104 updateAction(coordJob, coordAction, actionXml, store);
105
106 // TODO: time 100s should be configurable
107 queueCallable(new CoordActionNotification(coordAction), 100);
108 queueCallable(new CoordActionInputCheckCommand(coordAction.getId()), 100);
109 }
110 }
111 else {
112 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!");
113 }
114 coordInfo = new CoordinatorActionInfo(coordActions);
115 }
116 else {
117 log.info("CoordRerunCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid="
118 + jobId);
119 throw new CommandException(ErrorCode.E1018,
120 "coordinator job is killed or failed so all actions are not eligible to rerun!");
121 }
122 return coordInfo;
123 }
124 catch (XException xex) {
125 throw new CommandException(xex);
126 }
127 catch (JDOMException jex) {
128 throw new CommandException(ErrorCode.E0700, jex);
129 }
130 catch (Exception ex) {
131 throw new CommandException(ErrorCode.E1018, ex);
132 }
133 }
134
135 /**
136 * Get the list of actions for given id ranges
137 *
138 * @param jobId
139 * @param scope
140 * @param store
141 * @return the list of all actions to rerun
142 * @throws CommandException
143 * @throws StoreException
144 */
145 private List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope, CoordinatorStore store)
146 throws CommandException, StoreException {
147 ParamChecker.notEmpty(jobId, "jobId");
148 ParamChecker.notEmpty(scope, "scope");
149
150 Set<String> actions = new HashSet<String>();
151 String[] list = scope.split(",");
152 for (String s : list) {
153 s = s.trim();
154 if (s.contains("-")) {
155 String[] range = s.split("-");
156 if (range.length != 2) {
157 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
158 }
159 int start;
160 int end;
161 try {
162 start = Integer.parseInt(range[0].trim());
163 end = Integer.parseInt(range[1].trim());
164 if (start > end) {
165 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
166 }
167 }
168 catch (NumberFormatException ne) {
169 throw new CommandException(ErrorCode.E0302, ne);
170 }
171 for (int i = start; i <= end; i++) {
172 actions.add(jobId + "@" + i);
173 }
174 }
175 else {
176 try {
177 Integer.parseInt(s);
178 }
179 catch (NumberFormatException ne) {
180 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
181 + "'. Integer only.");
182 }
183 actions.add(jobId + "@" + s);
184 }
185 }
186
187 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
188 for (String id : actions) {
189 CoordinatorActionBean coordAction = store.getCoordinatorAction(id, false);
190 coordActions.add(coordAction);
191 log.debug("Rerun coordinator for actionId='" + id + "'");
192 }
193 return coordActions;
194 }
195
196 /**
197 * Get the list of actions for given date ranges
198 *
199 * @param jobId
200 * @param scope
201 * @param store
202 * @return the list of dates to rerun
203 * @throws CommandException
204 * @throws StoreException
205 */
206 private List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, CoordinatorStore store)
207 throws CommandException, StoreException {
208 ParamChecker.notEmpty(jobId, "jobId");
209 ParamChecker.notEmpty(scope, "scope");
210
211 Set<CoordinatorActionBean> actionSet = new HashSet<CoordinatorActionBean>();
212 String[] list = scope.split(",");
213 for (String s : list) {
214 s = s.trim();
215 if (s.contains("::")) {
216 String[] dateRange = s.split("::");
217 if (dateRange.length != 2) {
218 throw new CommandException(ErrorCode.E0302, "format is wrong for date's range '" + s + "'");
219 }
220 Date start;
221 Date end;
222 try {
223 start = DateUtils.parseDateUTC(dateRange[0].trim());
224 end = DateUtils.parseDateUTC(dateRange[1].trim());
225 if (start.after(end)) {
226 throw new CommandException(ErrorCode.E0302, "start date is older than end date: '" + s + "'");
227 }
228 }
229 catch (Exception e) {
230 throw new CommandException(ErrorCode.E0302, e);
231 }
232
233 List<CoordinatorActionBean> listOfActions = getActionIdsFromDateRange(jobId, start, end, store);
234 actionSet.addAll(listOfActions);
235 }
236 else {
237 Date date;
238 try {
239 date = DateUtils.parseDateUTC(s.trim());
240 }
241 catch (Exception e) {
242 throw new CommandException(ErrorCode.E0302, e);
243 }
244
245 CoordinatorActionBean coordAction = store.getCoordActionForNominalTime(jobId, date);
246 actionSet.add(coordAction);
247 }
248 }
249
250 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
251 for (CoordinatorActionBean coordAction : actionSet) {
252 coordActions.add(coordAction);
253 log.debug("Rerun coordinator for actionId='" + coordAction.getId() + "'");
254 }
255 return coordActions;
256 }
257
258 private List<CoordinatorActionBean> getActionIdsFromDateRange(String jobId, Date start, Date end,
259 CoordinatorStore store)
260 throws StoreException {
261 List<CoordinatorActionBean> list = store.getCoordActionsForDates(jobId, start, end);
262 return list;
263 }
264
265 /**
266 * Check if all given actions are eligible to rerun.
267 *
268 * @param actions list of CoordinatorActionBean
269 * @return true if all actions are eligible to rerun
270 */
271 private boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) {
272 for (CoordinatorActionBean coordAction : coordActions) {
273 if (!coordAction.isTerminalStatus()) {
274 return false;
275 }
276 }
277 return true;
278 }
279
280 /**
281 * Cleanup output-events directories
282 *
283 * @param eAction
284 * @param workflow
285 * @param action
286 */
287 @SuppressWarnings("unchecked")
288 private void cleanupOutputEvents(Element eAction, String user, String group, Configuration conf) {
289 Element outputList = eAction.getChild("output-events", eAction.getNamespace());
290 if (outputList != null) {
291 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
292 if (data.getChild("uris", data.getNamespace()) != null) {
293 String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
294 if (uris != null) {
295 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
296 for (String uri : uriArr) {
297 Path path = new Path(uri);
298 try {
299 FileSystem fs = Services.get().get(HadoopAccessorService.class).
300 createFileSystem(user, group, path.toUri(), conf);
301 if (fs.exists(path)) {
302 if (!fs.delete(path, true)) {
303 throw new IOException();
304 }
305 }
306 log.debug("Cleanup the output dir " + path);
307 }
308 catch (Exception ex) {
309 log.warn("Failed to cleanup the output dir " + uri, ex);
310 }
311 }
312 }
313
314 }
315 }
316 }
317 else {
318 log.info("No output-events defined in coordinator xml. Therefore nothing to cleanup");
319 }
320 }
321
322 /**
323 * Refresh an Action
324 *
325 * @param coordJob
326 * @param coordAction
327 * @param store
328 * @throws Exception
329 */
330 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, CoordinatorStore store)
331 throws Exception {
332 Configuration jobConf = null;
333 try {
334 jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
335 }
336 catch (IOException ioe) {
337 log.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
338 throw new CommandException(ErrorCode.E1005, ioe);
339 }
340 String jobXml = coordJob.getJobXml();
341 Element eJob = XmlUtils.parseXml(jobXml);
342 Date actualTime = new Date();
343 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction
344 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction);
345 log.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml="
346 + XmlUtils.prettyPrint(actionXml).toString());
347 coordAction.setActionXml(actionXml);
348 }
349
350 /**
351 * Update an Action into database table
352 *
353 * @param coordJob
354 * @param coordAction
355 * @param actionXml
356 * @param store
357 * @throws Exception
358 */
359 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml,
360 CoordinatorStore store) throws Exception {
361 log.debug("updateAction for actionId=" + coordAction.getId());
362 coordAction.setStatus(CoordinatorAction.Status.WAITING);
363 coordAction.setExternalId("");
364 coordAction.setExternalStatus("");
365 coordAction.setRerunTime(new Date());
366 store.updateCoordinatorAction(coordAction);
367 writeActionRegistration(coordAction.getActionXml(), coordAction, store, coordJob.getUser(), coordJob.getGroup());
368 }
369
370 /**
371 * Create SLA RegistrationEvent
372 *
373 * @param actionXml
374 * @param actionBean
375 * @param store
376 * @param user
377 * @param group
378 * @throws Exception
379 */
380 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store,
381 String user, String group)
382 throws Exception {
383 Element eAction = XmlUtils.parseXml(actionXml);
384 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
385 SLADbOperations.writeSlaRegistrationEvent(eSla, store, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user,
386 group);
387 }
388
389 @Override
390 protected CoordinatorActionInfo execute(CoordinatorStore store) throws StoreException, CommandException {
391 log.info("STARTED CoordRerunCommand for jobId=" + jobId + ", scope=" + scope);
392 CoordinatorActionInfo coordInfo = null;
393 try {
394 if (lock(jobId)) {
395 coordInfo = call(store);
396 }
397 else {
398 queueCallable(new CoordResumeCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL);
399 log.warn("CoordRerunCommand lock was not acquired - " + " failed " + jobId + ". Requeing the same.");
400 }
401 }
402 catch (InterruptedException e) {
403 queueCallable(new CoordResumeCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL);
404 log.warn("CoordRerunCommand lock acquiring failed " + " with exception " + e.getMessage() + " for job id "
405 + jobId + ". Requeing the same.");
406 }
407 finally {
408 log.info("ENDED CoordRerunCommand for jobId=" + jobId + ", scope=" + scope);
409 }
410 return coordInfo;
411 }
412
413 }