This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.List;
025
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.oozie.BundleActionBean;
028 import org.apache.oozie.BundleJobBean;
029 import org.apache.oozie.CoordinatorActionBean;
030 import org.apache.oozie.CoordinatorJobBean;
031 import org.apache.oozie.ErrorCode;
032 import org.apache.oozie.WorkflowActionBean;
033 import org.apache.oozie.client.Job;
034 import org.apache.oozie.client.OozieClient;
035 import org.apache.oozie.command.CommandException;
036 import org.apache.oozie.command.coord.CoordActionInputCheckCommand;
037 import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
038 import org.apache.oozie.command.coord.CoordActionReadyCommand;
039 import org.apache.oozie.command.coord.CoordActionReadyXCommand;
040 import org.apache.oozie.command.coord.CoordActionStartCommand;
041 import org.apache.oozie.command.coord.CoordActionStartXCommand;
042 import org.apache.oozie.command.coord.CoordKillXCommand;
043 import org.apache.oozie.command.coord.CoordResumeXCommand;
044 import org.apache.oozie.command.coord.CoordSubmitXCommand;
045 import org.apache.oozie.command.coord.CoordSuspendXCommand;
046 import org.apache.oozie.command.wf.ActionEndCommand;
047 import org.apache.oozie.command.wf.ActionEndXCommand;
048 import org.apache.oozie.command.wf.ActionStartCommand;
049 import org.apache.oozie.command.wf.ActionStartXCommand;
050 import org.apache.oozie.command.wf.KillXCommand;
051 import org.apache.oozie.command.wf.ResumeXCommand;
052 import org.apache.oozie.command.wf.SignalCommand;
053 import org.apache.oozie.command.wf.SignalXCommand;
054 import org.apache.oozie.command.wf.SuspendXCommand;
055 import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor;
056 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
057 import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
058 import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
059 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
060 import org.apache.oozie.executor.jpa.JPAExecutorException;
061 import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor;
062 import org.apache.oozie.util.JobUtils;
063 import org.apache.oozie.util.XCallable;
064 import org.apache.oozie.util.XConfiguration;
065 import org.apache.oozie.util.XLog;
066 import org.apache.oozie.util.XmlUtils;
067 import org.jdom.Attribute;
068 import org.jdom.Element;
069 import org.jdom.JDOMException;
070
071 /**
072 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then
073 * queues them for execution.
074 */
075 public class RecoveryService implements Service {
076
077 public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService.";
078 public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions.";
079 public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord.";
080 public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle.";
081 /**
082 * Time interval, in seconds, at which the recovery service will be scheduled to run.
083 */
084 public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval";
085 /**
086 * The number of callables to be queued in a batch.
087 */
088 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
089 /**
090 * Age of actions to queue, in seconds.
091 */
092 public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
093 /**
094 * Age of coordinator jobs to recover, in seconds.
095 */
096 public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than";
097
098 /**
099 * Age of Bundle jobs to recover, in seconds.
100 */
101 public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than";
102
103 private static final String INSTRUMENTATION_GROUP = "recovery";
104 private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
105 private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
106 private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
107
108 private static boolean useXCommand = true;
109
110
111 /**
112 * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
113 * queuing of commands.
114 */
115 static class RecoveryRunnable implements Runnable {
116 private final long olderThan;
117 private final long coordOlderThan;
118 private final long bundleOlderThan;
119 private long delay = 0;
120 private List<XCallable<?>> callables;
121 private List<XCallable<?>> delayedCallables;
122 private StringBuilder msg = null;
123 private JPAService jpaService = null;
124
125 public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
126 this.olderThan = olderThan;
127 this.coordOlderThan = coordOlderThan;
128 this.bundleOlderThan = bundleOlderThan;
129 }
130
131 public void run() {
132 XLog.Info.get().clear();
133 XLog log = XLog.getLog(getClass());
134 msg = new StringBuilder();
135 jpaService = Services.get().get(JPAService.class);
136 runWFRecovery();
137 runCoordActionRecovery();
138 runCoordActionRecoveryForReady();
139 runBundleRecovery();
140 log.debug("QUEUING [{0}] for potential recovery", msg.toString());
141 boolean ret = false;
142 if (null != callables) {
143 ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
144 if (ret == false) {
145 log.warn("Unable to queue the callables commands for RecoveryService. "
146 + "Most possibly command queue is full. Queue size is :"
147 + Services.get().get(CallableQueueService.class).queueSize());
148 }
149 callables = null;
150 }
151 if (null != delayedCallables) {
152 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
153 if (ret == false) {
154 log.warn("Unable to queue the delayedCallables commands for RecoveryService. "
155 + "Most possibly Callable queue is full. Queue size is :"
156 + Services.get().get(CallableQueueService.class).queueSize());
157 }
158 delayedCallables = null;
159 this.delay = 0;
160 }
161 }
162
163 private void runBundleRecovery(){
164 XLog.Info.get().clear();
165 XLog log = XLog.getLog(getClass());
166
167 try {
168 List<BundleActionBean> bactions = jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(bundleOlderThan));
169 msg.append(", BUNDLE_ACTIONS : " + bactions.size());
170 for (BundleActionBean baction : bactions) {
171 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
172 INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
173 if(baction.getStatus() == Job.Status.PREP){
174 BundleJobBean bundleJob = null;
175 try {
176 if (jpaService != null) {
177 bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId()));
178 }
179 if(bundleJob != null){
180 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
181 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
182 for (Element coordElem : coordElems) {
183 Attribute name = coordElem.getAttribute("name");
184 if (name.getValue().equals(baction.getCoordName())) {
185 Configuration coordConf = mergeConfig(coordElem,bundleJob);
186 coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId());
187 queueCallable(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue()));
188 }
189 }
190 }
191 }
192 catch (JDOMException jex) {
193 throw new CommandException(ErrorCode.E1301, jex);
194 }
195 catch (JPAExecutorException je) {
196 throw new CommandException(je);
197 }
198 }
199 else if(baction.getStatus() == Job.Status.KILLED){
200 queueCallable(new CoordKillXCommand(baction.getCoordId()));
201 }
202 else if(baction.getStatus() == Job.Status.SUSPENDED){
203 queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
204 }
205 else if(baction.getStatus() == Job.Status.RUNNING){
206 queueCallable(new CoordResumeXCommand(baction.getCoordId()));
207 }
208 }
209 }
210 catch (Exception ex) {
211 log.error("Exception, {0}", ex.getMessage(), ex);
212 }
213 }
214
215 /**
216 * Recover coordinator actions that are staying in WAITING or SUBMITTED too long
217 */
218 private void runCoordActionRecovery() {
219 XLog.Info.get().clear();
220 XLog log = XLog.getLog(getClass());
221
222 try {
223 List<CoordinatorActionBean> cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
224 msg.append(", COORD_ACTIONS : " + cactions.size());
225 for (CoordinatorActionBean caction : cactions) {
226 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
227 INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
228 if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
229 if (useXCommand) {
230 queueCallable(new CoordActionInputCheckXCommand(caction.getId()));
231 } else {
232 queueCallable(new CoordActionInputCheckCommand(caction.getId()));
233 }
234
235 log.info("Recover a WAITTING coord action and resubmit CoordActionInputCheckXCommand :" + caction.getId());
236 }
237 else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) {
238 CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(caction.getJobId()));
239
240 if (useXCommand) {
241 queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), coordJob
242 .getAuthToken()));
243 } else {
244 queueCallable(new CoordActionStartCommand(caction.getId(), coordJob.getUser(), coordJob
245 .getAuthToken()));
246 }
247
248 log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :" + caction.getId());
249 }
250 else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) {
251 if (caction.getExternalId() != null) {
252 queueCallable(new SuspendXCommand(caction.getExternalId()));
253 log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :" + caction.getId());
254 }
255 }
256 else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) {
257 if (caction.getExternalId() != null) {
258 queueCallable(new KillXCommand(caction.getExternalId()));
259 log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId());
260 }
261 }
262 else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) {
263 if (caction.getExternalId() != null) {
264 queueCallable(new ResumeXCommand(caction.getExternalId()));
265 log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
266 }
267 }
268 }
269 }
270 catch (Exception ex) {
271 log.error("Exception, {0}", ex.getMessage(), ex);
272 }
273 }
274
275 /**
276 * Recover coordinator actions that are staying in READY too long
277 */
278 private void runCoordActionRecoveryForReady() {
279 XLog.Info.get().clear();
280 XLog log = XLog.getLog(getClass());
281
282 try {
283 List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan));
284 msg.append(", COORD_READY_JOBS : " + jobids.size());
285 for (String jobid : jobids) {
286 if (useXCommand) {
287 queueCallable(new CoordActionReadyXCommand(jobid));
288 } else {
289 queueCallable(new CoordActionReadyCommand(jobid));
290 }
291
292 log.info("Recover READY coord actions for jobid :" + jobid);
293 }
294 }
295 catch (Exception ex) {
296 log.error("Exception, {0}", ex.getMessage(), ex);
297 }
298 }
299
300 /**
301 * Recover wf actions
302 */
303 private void runWFRecovery() {
304 XLog.Info.get().clear();
305 XLog log = XLog.getLog(getClass());
306 // queue command for action recovery
307 try {
308 List<WorkflowActionBean> actions = null;
309 try {
310 actions = jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(olderThan));
311 }
312 catch (JPAExecutorException ex) {
313 log.warn("Exception while reading pending actions from storage", ex);
314 }
315 //log.debug("QUEUING[{0}] pending wf actions for potential recovery", actions.size());
316 msg.append(" WF_ACTIONS " + actions.size());
317
318 for (WorkflowActionBean action : actions) {
319 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
320 INSTR_RECOVERED_ACTIONS_COUNTER, 1);
321 if (action.getStatus() == WorkflowActionBean.Status.PREP
322 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
323
324 if (useXCommand) {
325 queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
326 } else {
327 queueCallable(new ActionStartCommand(action.getId(), action.getType()));
328 }
329
330 }
331 else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
332 Date nextRunTime = action.getPendingAge();
333 if (useXCommand) {
334 queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime()
335 - System.currentTimeMillis());
336 } else {
337 queueCallable(new ActionStartCommand(action.getId(), action.getType()), nextRunTime.getTime()
338 - System.currentTimeMillis());
339 }
340
341 }
342 else if (action.getStatus() == WorkflowActionBean.Status.DONE
343 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
344 if (useXCommand) {
345 queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
346 } else {
347 queueCallable(new ActionEndCommand(action.getId(), action.getType()));
348 }
349
350 }
351 else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
352 Date nextRunTime = action.getPendingAge();
353 if (useXCommand) {
354 queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime()
355 - System.currentTimeMillis());
356 } else {
357 queueCallable(new ActionEndCommand(action.getId(), action.getType()), nextRunTime.getTime()
358 - System.currentTimeMillis());
359 }
360
361 }
362 else if (action.getStatus() == WorkflowActionBean.Status.OK
363 || action.getStatus() == WorkflowActionBean.Status.ERROR) {
364 if (useXCommand) {
365 queueCallable(new SignalXCommand(action.getJobId(), action.getId()));
366 } else {
367 queueCallable(new SignalCommand(action.getJobId(), action.getId()));
368 }
369
370 }
371 else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
372 queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
373 }
374 }
375 }
376 catch (Exception ex) {
377 log.error("Exception, {0}", ex.getMessage(), ex);
378 }
379 }
380
381 /**
382 * Adds callables to a list. If the number of callables in the list reaches {@link
383 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset.
384 *
385 * @param callable the callable to queue.
386 */
387 private void queueCallable(XCallable<?> callable) {
388 if (callables == null) {
389 callables = new ArrayList<XCallable<?>>();
390 }
391 callables.add(callable);
392 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
393 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
394 if (ret == false) {
395 XLog.getLog(getClass()).warn(
396 "Unable to queue the callables commands for RecoveryService. "
397 + "Most possibly command queue is full. Queue size is :"
398 + Services.get().get(CallableQueueService.class).queueSize());
399 }
400 callables = new ArrayList<XCallable<?>>();
401 }
402 }
403
404 /**
405 * Adds callables to a list. If the number of callables in the list reaches {@link
406 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay
407 * of the callables in the list. The callables list and the delay is reset.
408 *
409 * @param callable the callable to queue.
410 * @param delay the delay for the callable.
411 */
412 private void queueCallable(XCallable<?> callable, long delay) {
413 if (delayedCallables == null) {
414 delayedCallables = new ArrayList<XCallable<?>>();
415 }
416 this.delay = Math.max(this.delay, delay);
417 delayedCallables.add(callable);
418 if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
419 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
420 if (ret == false) {
421 XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. "
422 + "Most possibly Callable queue is full. Queue size is :"
423 + Services.get().get(CallableQueueService.class).queueSize());
424 }
425 delayedCallables = new ArrayList<XCallable<?>>();
426 this.delay = 0;
427 }
428 }
429 }
430
431 /**
432 * Initializes the RecoveryService.
433 *
434 * @param services services instance.
435 */
436 @Override
437 public void init(Services services) {
438 Configuration conf = services.getConf();
439 Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt(
440 CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600));
441 services.get(SchedulerService.class).schedule(recoveryRunnable, 10, conf.getInt(CONF_SERVICE_INTERVAL, 600),
442 SchedulerService.Unit.SEC);
443
444 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
445 useXCommand = false;
446 }
447 }
448
449 /**
450 * Destroy the Recovery Service.
451 */
452 @Override
453 public void destroy() {
454 }
455
456 /**
457 * Return the public interface for the Recovery Service.
458 *
459 * @return {@link RecoveryService}.
460 */
461 @Override
462 public Class<? extends Service> getInterface() {
463 return RecoveryService.class;
464 }
465
466 /**
467 * Merge Bundle job config and the configuration from the coord job to pass
468 * to Coord Engine
469 *
470 * @param coordElem the coordinator configuration
471 * @return Configuration merged configuration
472 * @throws CommandException thrown if failed to merge configuration
473 */
474 private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException {
475 XLog.Info.get().clear();
476 XLog log = XLog.getLog("RecoveryService");
477
478 String jobConf = bundleJob.getConf();
479 // Step 1: runConf = jobConf
480 Configuration runConf = null;
481 try {
482 runConf = new XConfiguration(new StringReader(jobConf));
483 }
484 catch (IOException e1) {
485 log.warn("Configuration parse error in:" + jobConf);
486 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
487 }
488 // Step 2: Merge local properties into runConf
489 // extract 'property' tags under 'configuration' block in the coordElem
490 // convert Element to XConfiguration
491 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
492
493 if (localConfigElement != null) {
494 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
495 Configuration localConf;
496 try {
497 localConf = new XConfiguration(new StringReader(strConfig));
498 }
499 catch (IOException e1) {
500 log.warn("Configuration parse error in:" + strConfig);
501 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
502 }
503
504 // copy configuration properties in the coordElem to the runConf
505 XConfiguration.copy(localConf, runConf);
506 }
507
508 // Step 3: Extract value of 'app-path' in coordElem, save it as a
509 // new property called 'oozie.coord.application.path', and normalize.
510 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
511 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
512 // Normalize coordinator appPath here;
513 try {
514 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
515 }
516 catch (IOException e) {
517 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
518 }
519 return runConf;
520 }
521 }