This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.Date;
023 import java.util.List;
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.fs.Path;
026 import org.apache.oozie.CoordinatorActionBean;
027 import org.apache.oozie.CoordinatorActionInfo;
028 import org.apache.oozie.CoordinatorJobBean;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.SLAEventBean;
031 import org.apache.oozie.XException;
032 import org.apache.oozie.action.ActionExecutorException;
033 import org.apache.oozie.action.hadoop.FsActionExecutor;
034 import org.apache.oozie.client.CoordinatorAction;
035 import org.apache.oozie.client.CoordinatorJob;
036 import org.apache.oozie.client.Job;
037 import org.apache.oozie.client.SLAEvent.SlaAppType;
038 import org.apache.oozie.client.rest.RestConstants;
039 import org.apache.oozie.command.CommandException;
040 import org.apache.oozie.command.PreconditionException;
041 import org.apache.oozie.command.RerunTransitionXCommand;
042 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
043 import org.apache.oozie.coord.CoordELFunctions;
044 import org.apache.oozie.coord.CoordUtils;
045 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
046 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047 import org.apache.oozie.executor.jpa.JPAExecutorException;
048 import org.apache.oozie.service.JPAService;
049 import org.apache.oozie.service.Services;
050 import org.apache.oozie.util.InstrumentUtils;
051 import org.apache.oozie.util.LogUtils;
052 import org.apache.oozie.util.ParamChecker;
053 import org.apache.oozie.util.StatusUtils;
054 import org.apache.oozie.util.XConfiguration;
055 import org.apache.oozie.util.XLog;
056 import org.apache.oozie.util.XmlUtils;
057 import org.apache.oozie.util.db.SLADbOperations;
058 import org.jdom.Element;
059 import org.jdom.JDOMException;
060
061 /**
062 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup.
063 * <p/>
064 * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or
065 * {@link RestConstants.JOB_COORD_RERUN_ACTION}.
066 * <p/>
067 * The "refresh" is used to indicate if user wants to refresh an action's input and output events.
068 * <p/>
069 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
070 */
071 public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> {
072
073 private String rerunType;
074 private String scope;
075 private boolean refresh;
076 private boolean noCleanup;
077 private CoordinatorJobBean coordJob = null;
078 private JPAService jpaService = null;
079 protected boolean prevPending;
080
081 /**
082 * The constructor for class {@link CoordRerunXCommand}
083 *
084 * @param jobId the job id
085 * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION}
086 * @param scope the rerun scope for given rerunType separated by ","
087 * @param refresh true if user wants to refresh input/output dataset urls
088 * @param noCleanup false if user wants to cleanup output events for given rerun actions
089 */
090 public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
091 super("coord_rerun", "coord_rerun", 1);
092 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
093 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType");
094 this.scope = ParamChecker.notEmpty(scope, "scope");
095 this.refresh = refresh;
096 this.noCleanup = noCleanup;
097 }
098
099 /**
100 * Check if all given actions are eligible to rerun.
101 *
102 * @param actions list of CoordinatorActionBean
103 * @return true if all actions are eligible to rerun
104 */
105 private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) {
106 ParamChecker.notNull(coordActions, "Coord actions to be rerun");
107 boolean ret = false;
108 for (CoordinatorActionBean coordAction : coordActions) {
109 ret = true;
110 if (!coordAction.isTerminalStatus()) {
111 ret = false;
112 break;
113 }
114 }
115 return ret;
116 }
117
118 /**
119 * Get the list of actions for a given coordinator job
120 * @param rerunType the rerun type (date, action)
121 * @param jobId the coordinator job id
122 * @param scope the date scope or action id scope
123 * @return the list of Coordinator actions
124 * @throws CommandException
125 */
126 public static List<CoordinatorActionBean> getCoordActions(String rerunType, String jobId, String scope) throws CommandException{
127 List<CoordinatorActionBean> coordActions = null;
128 if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) {
129 coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope);
130 }
131 else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) {
132 coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope);
133 }
134 return coordActions;
135 }
136
137 /**
138 * Cleanup output-events directories
139 *
140 * @param eAction coordinator action xml
141 * @param user user name
142 * @param group group name
143 */
144 @SuppressWarnings("unchecked")
145 private void cleanupOutputEvents(Element eAction, String user, String group) {
146 Element outputList = eAction.getChild("output-events", eAction.getNamespace());
147 if (outputList != null) {
148 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
149 if (data.getChild("uris", data.getNamespace()) != null) {
150 String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
151 if (uris != null) {
152 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
153 FsActionExecutor fsAe = new FsActionExecutor();
154 for (String uri : uriArr) {
155 Path path = new Path(uri);
156 try {
157 fsAe.delete(user, group, path);
158 LOG.debug("Cleanup the output dir " + path);
159 }
160 catch (ActionExecutorException ae) {
161 LOG.warn("Failed to cleanup the output dir " + uri, ae);
162 }
163 }
164 }
165
166 }
167 }
168 }
169 else {
170 LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup");
171 }
172 }
173
174 /**
175 * Refresh an action's input and ouput events.
176 *
177 * @param coordJob coordinator job bean
178 * @param coordAction coordinator action bean
179 * @throws Exception thrown if failed to materialize coordinator action
180 */
181 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception {
182 Configuration jobConf = null;
183 try {
184 jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
185 }
186 catch (IOException ioe) {
187 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
188 throw new CommandException(ErrorCode.E1005, ioe);
189 }
190 String jobXml = coordJob.getJobXml();
191 Element eJob = XmlUtils.parseXml(jobXml);
192 Date actualTime = new Date();
193 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction
194 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction);
195 LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml="
196 + XmlUtils.prettyPrint(actionXml).toString());
197 coordAction.setActionXml(actionXml);
198 }
199
200 /**
201 * Update an action into database table
202 *
203 * @param coordJob coordinator job bean
204 * @param coordAction coordinator action bean
205 * @param actionXml coordinator action xml
206 * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event
207 */
208 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml)
209 throws Exception {
210 LOG.debug("updateAction for actionId=" + coordAction.getId());
211 if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) {
212 LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId());
213 coordAction.setCreatedTime(new Date());
214 }
215 coordAction.setStatus(CoordinatorAction.Status.WAITING);
216 coordAction.setExternalId("");
217 coordAction.setExternalStatus("");
218 coordAction.setRerunTime(new Date());
219 coordAction.setLastModifiedTime(new Date());
220 updateList.add(coordAction);
221 writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
222 }
223
224 /**
225 * Create SLA RegistrationEvent
226 *
227 * @param actionXml action xml
228 * @param actionBean coordinator action bean
229 * @param user user name
230 * @param group group name
231 * @throws Exception thrown if unable to write sla registration event
232 */
233 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group)
234 throws Exception {
235 Element eAction = XmlUtils.parseXml(actionXml);
236 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
237 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(),
238 SlaAppType.COORDINATOR_ACTION, user, group, LOG);
239 if(slaEvent != null) {
240 insertList.add(slaEvent);
241 }
242 }
243
244 /* (non-Javadoc)
245 * @see org.apache.oozie.command.XCommand#getEntityKey()
246 */
247 @Override
248 public String getEntityKey() {
249 return jobId;
250 }
251
252 /* (non-Javadoc)
253 * @see org.apache.oozie.command.XCommand#isLockRequired()
254 */
255 @Override
256 protected boolean isLockRequired() {
257 return true;
258 }
259
260 /* (non-Javadoc)
261 * @see org.apache.oozie.command.XCommand#loadState()
262 */
263 @Override
264 protected void loadState() throws CommandException {
265 jpaService = Services.get().get(JPAService.class);
266 if (jpaService == null) {
267 throw new CommandException(ErrorCode.E0610);
268 }
269 try {
270 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
271 prevPending = coordJob.isPending();
272 }
273 catch (JPAExecutorException je) {
274 throw new CommandException(je);
275 }
276 LogUtils.setLogInfo(coordJob, logInfo);
277 }
278
279 /* (non-Javadoc)
280 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
281 */
282 @Override
283 protected void verifyPrecondition() throws CommandException, PreconditionException {
284 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED
285 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
286 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
287 throw new CommandException(ErrorCode.E1018,
288 "coordinator job is killed or failed so all actions are not eligible to rerun!");
289 }
290
291 // no actioins have been created for PREP job
292 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) {
293 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
294 throw new CommandException(ErrorCode.E1018,
295 "coordinator job is PREP so no actions are materialized to rerun!");
296 }
297 }
298
299 @Override
300 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
301 verifyPrecondition();
302 }
303
304 @Override
305 public void rerunChildren() throws CommandException {
306 boolean isError = false;
307 try {
308 CoordinatorActionInfo coordInfo = null;
309 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
310 List<CoordinatorActionBean> coordActions = getCoordActions(rerunType, jobId, scope);
311 if (checkAllActionsRunnable(coordActions)) {
312 for (CoordinatorActionBean coordAction : coordActions) {
313 String actionXml = coordAction.getActionXml();
314 if (!noCleanup) {
315 Element eAction = XmlUtils.parseXml(actionXml);
316 cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup());
317 }
318 if (refresh) {
319 refreshAction(coordJob, coordAction);
320 }
321 updateAction(coordJob, coordAction, actionXml);
322
323 queue(new CoordActionNotificationXCommand(coordAction), 100);
324 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100);
325 }
326 }
327 else {
328 isError = true;
329 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!");
330 }
331 coordInfo = new CoordinatorActionInfo(coordActions);
332
333 ret = coordInfo;
334 }
335 catch (XException xex) {
336 isError = true;
337 throw new CommandException(xex);
338 }
339 catch (JDOMException jex) {
340 isError = true;
341 throw new CommandException(ErrorCode.E0700, jex);
342 }
343 catch (Exception ex) {
344 isError = true;
345 throw new CommandException(ErrorCode.E1018, ex);
346 }
347 finally{
348 if(isError){
349 transitToPrevious();
350 }
351 }
352 }
353
354 /*
355 * (non-Javadoc)
356 * @see org.apache.oozie.command.TransitionXCommand#getJob()
357 */
358 @Override
359 public Job getJob() {
360 return coordJob;
361 }
362
363 @Override
364 public void notifyParent() throws CommandException {
365 //update bundle action
366 if (getPrevStatus() != null && coordJob.getBundleId() != null) {
367 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus());
368 bundleStatusUpdate.call();
369 }
370 }
371
372 @Override
373 public void updateJob() {
374 if (getPrevStatus()!= null){
375 Job.Status coordJobStatus = getPrevStatus();
376 if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
377 coordJob.setStatus(coordJobStatus);
378 }
379 if (prevPending) {
380 coordJob.setPending();
381 } else {
382 coordJob.resetPending();
383 }
384 }
385
386 updateList.add(coordJob);
387 }
388
389 /* (non-Javadoc)
390 * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
391 */
392 @Override
393 public void performWrites() throws CommandException {
394 try {
395 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
396 }
397 catch (JPAExecutorException e) {
398 throw new CommandException(e);
399 }
400 }
401
402 /* (non-Javadoc)
403 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
404 */
405 @Override
406 public XLog getLog() {
407 return LOG;
408 }
409
410 @Override
411 public final void transitToNext() {
412 prevStatus = coordJob.getStatus();
413 if (prevStatus == CoordinatorJob.Status.SUCCEEDED || prevStatus == CoordinatorJob.Status.PAUSED
414 || prevStatus == CoordinatorJob.Status.SUSPENDED || prevStatus == CoordinatorJob.Status.RUNNING) {
415 coordJob.setStatus(Job.Status.RUNNING);
416 }
417 else {
418 // Check for backward compatibility for Oozie versions (3.2 and before)
419 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
420 // PAUSEDWITHERROR is not supported
421 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
422 }
423 // used for backward support of coordinator 0.1 schema
424 coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus));
425 coordJob.setPending();
426 }
427
428 private final void transitToPrevious() throws CommandException {
429 coordJob.setStatus(getPrevStatus());
430 if (!prevPending) {
431 coordJob.resetPending();
432 }
433 else {
434 coordJob.setPending();
435 }
436 }
437 }