This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.Date;
023 import java.util.List;
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.fs.Path;
026 import org.apache.oozie.CoordinatorActionBean;
027 import org.apache.oozie.CoordinatorActionInfo;
028 import org.apache.oozie.CoordinatorJobBean;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.SLAEventBean;
031 import org.apache.oozie.XException;
032 import org.apache.oozie.action.ActionExecutorException;
033 import org.apache.oozie.action.hadoop.FsActionExecutor;
034 import org.apache.oozie.client.CoordinatorAction;
035 import org.apache.oozie.client.CoordinatorJob;
036 import org.apache.oozie.client.Job;
037 import org.apache.oozie.client.SLAEvent.SlaAppType;
038 import org.apache.oozie.client.rest.RestConstants;
039 import org.apache.oozie.command.CommandException;
040 import org.apache.oozie.command.PreconditionException;
041 import org.apache.oozie.command.RerunTransitionXCommand;
042 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
043 import org.apache.oozie.coord.CoordELFunctions;
044 import org.apache.oozie.coord.CoordUtils;
045 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
046 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047 import org.apache.oozie.executor.jpa.JPAExecutorException;
048 import org.apache.oozie.service.EventHandlerService;
049 import org.apache.oozie.service.JPAService;
050 import org.apache.oozie.service.Services;
051 import org.apache.oozie.sla.SLAOperations;
052 import org.apache.oozie.sla.service.SLAService;
053 import org.apache.oozie.util.InstrumentUtils;
054 import org.apache.oozie.util.LogUtils;
055 import org.apache.oozie.util.ParamChecker;
056 import org.apache.oozie.util.StatusUtils;
057 import org.apache.oozie.util.XConfiguration;
058 import org.apache.oozie.util.XLog;
059 import org.apache.oozie.util.XmlUtils;
060 import org.apache.oozie.util.db.SLADbOperations;
061 import org.jdom.Element;
062 import org.jdom.JDOMException;
063
064 /**
065 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup.
066 * <p/>
067 * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or
068 * {@link RestConstants.JOB_COORD_RERUN_ACTION}.
069 * <p/>
070 * The "refresh" is used to indicate if user wants to refresh an action's input and output events.
071 * <p/>
072 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
073 */
074 @SuppressWarnings("deprecation")
075 public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> {
076
077 private String rerunType;
078 private String scope;
079 private boolean refresh;
080 private boolean noCleanup;
081 private CoordinatorJobBean coordJob = null;
082 private JPAService jpaService = null;
083 protected boolean prevPending;
084
085 /**
086 * The constructor for class {@link CoordRerunXCommand}
087 *
088 * @param jobId the job id
089 * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION}
090 * @param scope the rerun scope for given rerunType separated by ","
091 * @param refresh true if user wants to refresh input/output dataset urls
092 * @param noCleanup false if user wants to cleanup output events for given rerun actions
093 */
094 public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
095 super("coord_rerun", "coord_rerun", 1);
096 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
097 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType");
098 this.scope = ParamChecker.notEmpty(scope, "scope");
099 this.refresh = refresh;
100 this.noCleanup = noCleanup;
101 }
102
103 /**
104 * Check if all given actions are eligible to rerun.
105 *
106 * @param actions list of CoordinatorActionBean
107 * @return true if all actions are eligible to rerun
108 */
109 private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) {
110 ParamChecker.notNull(coordActions, "Coord actions to be rerun");
111 boolean ret = false;
112 for (CoordinatorActionBean coordAction : coordActions) {
113 ret = true;
114 if (!coordAction.isTerminalStatus()) {
115 ret = false;
116 break;
117 }
118 }
119 return ret;
120 }
121
122 /**
123 * Get the list of actions for a given coordinator job
124 * @param rerunType the rerun type (date, action)
125 * @param jobId the coordinator job id
126 * @param scope the date scope or action id scope
127 * @return the list of Coordinator actions
128 * @throws CommandException
129 */
130 public static List<CoordinatorActionBean> getCoordActions(String rerunType, String jobId, String scope) throws CommandException{
131 List<CoordinatorActionBean> coordActions = null;
132 if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) {
133 coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope);
134 }
135 else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) {
136 coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope);
137 }
138 return coordActions;
139 }
140
141 /**
142 * Cleanup output-events directories
143 *
144 * @param eAction coordinator action xml
145 * @param user user name
146 * @param group group name
147 */
148 @SuppressWarnings("unchecked")
149 private void cleanupOutputEvents(Element eAction, String user, String group) {
150 Element outputList = eAction.getChild("output-events", eAction.getNamespace());
151 if (outputList != null) {
152 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
153 if (data.getChild("uris", data.getNamespace()) != null) {
154 String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
155 if (uris != null) {
156 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
157 FsActionExecutor fsAe = new FsActionExecutor();
158 for (String uri : uriArr) {
159 Path path = new Path(uri);
160 try {
161 fsAe.delete(user, group, path);
162 LOG.debug("Cleanup the output dir " + path);
163 }
164 catch (ActionExecutorException ae) {
165 LOG.warn("Failed to cleanup the output dir " + uri, ae);
166 }
167 }
168 }
169
170 }
171 }
172 }
173 else {
174 LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup");
175 }
176 }
177
178 /**
179 * Refresh an action's input and ouput events.
180 *
181 * @param coordJob coordinator job bean
182 * @param coordAction coordinator action bean
183 * @throws Exception thrown if failed to materialize coordinator action
184 */
185 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception {
186 Configuration jobConf = null;
187 try {
188 jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
189 }
190 catch (IOException ioe) {
191 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
192 throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
193 }
194 String jobXml = coordJob.getJobXml();
195 Element eJob = XmlUtils.parseXml(jobXml);
196 Date actualTime = new Date();
197 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction
198 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction);
199 LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml="
200 + XmlUtils.prettyPrint(actionXml).toString());
201 coordAction.setActionXml(actionXml);
202 }
203
204 /**
205 * Update an action into database table
206 *
207 * @param coordJob coordinator job bean
208 * @param coordAction coordinator action bean
209 * @param actionXml coordinator action xml
210 * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event
211 */
212 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml)
213 throws Exception {
214 LOG.debug("updateAction for actionId=" + coordAction.getId());
215 if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) {
216 LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId());
217 coordAction.setCreatedTime(new Date());
218 }
219 coordAction.setStatus(CoordinatorAction.Status.WAITING);
220 coordAction.setExternalId("");
221 coordAction.setExternalStatus("");
222 coordAction.setRerunTime(new Date());
223 coordAction.setLastModifiedTime(new Date());
224 updateList.add(coordAction);
225 writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
226 }
227
228 /**
229 * Create SLA RegistrationEvent
230 *
231 * @param actionXml action xml
232 * @param actionBean coordinator action bean
233 * @param user user name
234 * @param group group name
235 * @throws Exception thrown if unable to write sla registration event
236 */
237 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group)
238 throws Exception {
239 Element eAction = XmlUtils.parseXml(actionXml);
240 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
241 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(),
242 SlaAppType.COORDINATOR_ACTION, user, group, LOG);
243 if(slaEvent != null) {
244 insertList.add(slaEvent);
245 }
246 }
247
248 /* (non-Javadoc)
249 * @see org.apache.oozie.command.XCommand#getEntityKey()
250 */
251 @Override
252 public String getEntityKey() {
253 return jobId;
254 }
255
256 /* (non-Javadoc)
257 * @see org.apache.oozie.command.XCommand#isLockRequired()
258 */
259 @Override
260 protected boolean isLockRequired() {
261 return true;
262 }
263
264 /* (non-Javadoc)
265 * @see org.apache.oozie.command.XCommand#loadState()
266 */
267 @Override
268 protected void loadState() throws CommandException {
269 jpaService = Services.get().get(JPAService.class);
270 if (jpaService == null) {
271 throw new CommandException(ErrorCode.E0610);
272 }
273 try {
274 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
275 prevPending = coordJob.isPending();
276 }
277 catch (JPAExecutorException je) {
278 throw new CommandException(je);
279 }
280 LogUtils.setLogInfo(coordJob, logInfo);
281 }
282
283 /* (non-Javadoc)
284 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
285 */
286 @Override
287 protected void verifyPrecondition() throws CommandException, PreconditionException {
288 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, coordJob.getStatus());
289 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED
290 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
291 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
292 // Call the parent so the pending flag is reset and state transition
293 // of bundle can happen
294 if (coordJob.getBundleId() != null) {
295 bundleStatusUpdate.call();
296 }
297 throw new CommandException(ErrorCode.E1018,
298 "coordinator job is killed or failed so all actions are not eligible to rerun!");
299 }
300
301 // no actioins have been created for PREP job
302 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) {
303 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
304 // Call the parent so the pending flag is reset and state transition
305 // of bundle can happen
306 if (coordJob.getBundleId() != null) {
307 bundleStatusUpdate.call();
308 }
309 throw new CommandException(ErrorCode.E1018,
310 "coordinator job is PREP so no actions are materialized to rerun!");
311 }
312 }
313
314 @Override
315 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
316 verifyPrecondition();
317 }
318
319 @Override
320 public void rerunChildren() throws CommandException {
321 boolean isError = false;
322 try {
323 CoordinatorActionInfo coordInfo = null;
324 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
325 List<CoordinatorActionBean> coordActions = getCoordActions(rerunType, jobId, scope);
326 if (checkAllActionsRunnable(coordActions)) {
327 for (CoordinatorActionBean coordAction : coordActions) {
328 String actionXml = coordAction.getActionXml();
329 if (!noCleanup) {
330 Element eAction = XmlUtils.parseXml(actionXml);
331 cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup());
332 }
333 if (refresh) {
334 refreshAction(coordJob, coordAction);
335 }
336 updateAction(coordJob, coordAction, actionXml);
337 if (SLAService.isEnabled()) {
338 SLAOperations.updateRegistrationEvent(coordAction.getId());
339 }
340 queue(new CoordActionNotificationXCommand(coordAction), 100);
341 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100);
342 }
343 }
344 else {
345 isError = true;
346 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!");
347 }
348 coordInfo = new CoordinatorActionInfo(coordActions);
349
350 ret = coordInfo;
351 }
352 catch (XException xex) {
353 isError = true;
354 throw new CommandException(xex);
355 }
356 catch (JDOMException jex) {
357 isError = true;
358 throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
359 }
360 catch (Exception ex) {
361 isError = true;
362 throw new CommandException(ErrorCode.E1018, ex.getMessage(), ex);
363 }
364 finally{
365 if(isError){
366 transitToPrevious();
367 }
368 }
369 }
370
371 /*
372 * (non-Javadoc)
373 * @see org.apache.oozie.command.TransitionXCommand#getJob()
374 */
375 @Override
376 public Job getJob() {
377 return coordJob;
378 }
379
380 @Override
381 public void notifyParent() throws CommandException {
382 //update bundle action
383 if (getPrevStatus() != null && coordJob.getBundleId() != null) {
384 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus());
385 bundleStatusUpdate.call();
386 }
387 }
388
389 @Override
390 public void updateJob() {
391 if (getPrevStatus()!= null){
392 Job.Status coordJobStatus = getPrevStatus();
393 if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
394 coordJob.setStatus(coordJobStatus);
395 }
396 if (prevPending) {
397 coordJob.setPending();
398 } else {
399 coordJob.resetPending();
400 }
401 }
402 updateList.add(coordJob);
403 }
404
405 /* (non-Javadoc)
406 * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
407 */
408 @Override
409 public void performWrites() throws CommandException {
410 try {
411 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
412 if (EventHandlerService.isEnabled()) {
413 generateEvents(coordJob);
414 }
415 }
416 catch (JPAExecutorException e) {
417 throw new CommandException(e);
418 }
419 }
420
421 /* (non-Javadoc)
422 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
423 */
424 @Override
425 public XLog getLog() {
426 return LOG;
427 }
428
429 @Override
430 public final void transitToNext() {
431 prevStatus = coordJob.getStatus();
432 if (prevStatus == CoordinatorJob.Status.SUCCEEDED || prevStatus == CoordinatorJob.Status.PAUSED
433 || prevStatus == CoordinatorJob.Status.SUSPENDED || prevStatus == CoordinatorJob.Status.RUNNING) {
434 coordJob.setStatus(Job.Status.RUNNING);
435 }
436 else {
437 // Check for backward compatibility for Oozie versions (3.2 and before)
438 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
439 // PAUSEDWITHERROR is not supported
440 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
441 }
442 // used for backward support of coordinator 0.1 schema
443 coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus));
444 coordJob.setPending();
445 }
446
447 private final void transitToPrevious() throws CommandException {
448 coordJob.setStatus(getPrevStatus());
449 if (!prevPending) {
450 coordJob.resetPending();
451 }
452 else {
453 coordJob.setPending();
454 }
455 }
456 }