This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.List;
025
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.oozie.BundleActionBean;
028 import org.apache.oozie.BundleJobBean;
029 import org.apache.oozie.CoordinatorActionBean;
030 import org.apache.oozie.CoordinatorJobBean;
031 import org.apache.oozie.ErrorCode;
032 import org.apache.oozie.WorkflowActionBean;
033 import org.apache.oozie.client.Job;
034 import org.apache.oozie.client.OozieClient;
035 import org.apache.oozie.command.CommandException;
036 import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
037 import org.apache.oozie.command.coord.CoordActionReadyXCommand;
038 import org.apache.oozie.command.coord.CoordActionStartXCommand;
039 import org.apache.oozie.command.coord.CoordKillXCommand;
040 import org.apache.oozie.command.coord.CoordResumeXCommand;
041 import org.apache.oozie.command.coord.CoordSubmitXCommand;
042 import org.apache.oozie.command.coord.CoordSuspendXCommand;
043 import org.apache.oozie.command.wf.ActionEndXCommand;
044 import org.apache.oozie.command.wf.ActionStartXCommand;
045 import org.apache.oozie.command.wf.KillXCommand;
046 import org.apache.oozie.command.wf.ResumeXCommand;
047 import org.apache.oozie.command.wf.SignalXCommand;
048 import org.apache.oozie.command.wf.SuspendXCommand;
049 import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor;
050 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
051 import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
052 import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
053 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
054 import org.apache.oozie.executor.jpa.JPAExecutorException;
055 import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor;
056 import org.apache.oozie.util.JobUtils;
057 import org.apache.oozie.util.XCallable;
058 import org.apache.oozie.util.XConfiguration;
059 import org.apache.oozie.util.XLog;
060 import org.apache.oozie.util.XmlUtils;
061 import org.jdom.Attribute;
062 import org.jdom.Element;
063 import org.jdom.JDOMException;
064
065 /**
066 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then
067 * queues them for execution.
068 */
069 public class RecoveryService implements Service {
070
071 public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService.";
072 public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions.";
073 public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord.";
074 public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle.";
075 /**
076 * Time interval, in seconds, at which the recovery service will be scheduled to run.
077 */
078 public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval";
079 /**
080 * The number of callables to be queued in a batch.
081 */
082 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
083 /**
084 * Age of actions to queue, in seconds.
085 */
086 public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
087 /**
088 * Age of coordinator jobs to recover, in seconds.
089 */
090 public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than";
091
092 /**
093 * Age of Bundle jobs to recover, in seconds.
094 */
095 public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than";
096
097 private static final String INSTRUMENTATION_GROUP = "recovery";
098 private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
099 private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
100 private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
101
102
103 /**
104 * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
105 * queuing of commands.
106 */
107 static class RecoveryRunnable implements Runnable {
108 private final long olderThan;
109 private final long coordOlderThan;
110 private final long bundleOlderThan;
111 private long delay = 0;
112 private List<XCallable<?>> callables;
113 private List<XCallable<?>> delayedCallables;
114 private StringBuilder msg = null;
115 private JPAService jpaService = null;
116
117 public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
118 this.olderThan = olderThan;
119 this.coordOlderThan = coordOlderThan;
120 this.bundleOlderThan = bundleOlderThan;
121 }
122
123 public void run() {
124 XLog.Info.get().clear();
125 XLog log = XLog.getLog(getClass());
126 msg = new StringBuilder();
127 jpaService = Services.get().get(JPAService.class);
128 runWFRecovery();
129 runCoordActionRecovery();
130 runCoordActionRecoveryForReady();
131 runBundleRecovery();
132 log.debug("QUEUING [{0}] for potential recovery", msg.toString());
133 boolean ret = false;
134 if (null != callables) {
135 ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
136 if (ret == false) {
137 log.warn("Unable to queue the callables commands for RecoveryService. "
138 + "Most possibly command queue is full. Queue size is :"
139 + Services.get().get(CallableQueueService.class).queueSize());
140 }
141 callables = null;
142 }
143 if (null != delayedCallables) {
144 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
145 if (ret == false) {
146 log.warn("Unable to queue the delayedCallables commands for RecoveryService. "
147 + "Most possibly Callable queue is full. Queue size is :"
148 + Services.get().get(CallableQueueService.class).queueSize());
149 }
150 delayedCallables = null;
151 this.delay = 0;
152 }
153 }
154
155 private void runBundleRecovery(){
156 XLog.Info.get().clear();
157 XLog log = XLog.getLog(getClass());
158 List<BundleActionBean> bactions = null;
159 try {
160 bactions = jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(bundleOlderThan));
161 }
162 catch (JPAExecutorException ex) {
163 log.warn("Error reading bundle actions from database", ex);
164 return;
165 }
166 msg.append(", BUNDLE_ACTIONS : " + bactions.size());
167 for (BundleActionBean baction : bactions) {
168 try {
169 Services.get().get(InstrumentationService.class).get()
170 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
171 if (baction.getCoordId() == null) {
172 log.error("CoordId is null for Bundle action " + baction.getBundleActionId());
173 continue;
174 }
175 if (baction.getStatus() == Job.Status.PREP) {
176 BundleJobBean bundleJob = null;
177
178 if (jpaService != null) {
179 bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId()));
180 }
181 if (bundleJob != null) {
182 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
183 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
184 for (Element coordElem : coordElems) {
185 Attribute name = coordElem.getAttribute("name");
186 if (name.getValue().equals(baction.getCoordName())) {
187 Configuration coordConf = mergeConfig(coordElem, bundleJob);
188 coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId());
189 queueCallable(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(),
190 bundleJob.getId(), name.getValue()));
191 }
192 }
193 }
194
195 }
196 else if (baction.getStatus() == Job.Status.KILLED) {
197 queueCallable(new CoordKillXCommand(baction.getCoordId()));
198 }
199 else if (baction.getStatus() == Job.Status.SUSPENDED
200 || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
201 queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
202 }
203 else if (baction.getStatus() == Job.Status.RUNNING
204 || baction.getStatus() == Job.Status.RUNNINGWITHERROR) {
205 queueCallable(new CoordResumeXCommand(baction.getCoordId()));
206 }
207 }
208 catch (Exception ex) {
209 log.error("Exception, {0}", ex.getMessage(), ex);
210 }
211 }
212
213
214 }
215
216 /**
217 * Recover coordinator actions that are staying in WAITING or SUBMITTED too long
218 */
219 private void runCoordActionRecovery() {
220 XLog.Info.get().clear();
221 XLog log = XLog.getLog(getClass());
222 List<CoordinatorActionBean> cactions = null;
223 try {
224 cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
225 }
226 catch (JPAExecutorException ex) {
227 log.warn("Error reading coord actions from database", ex);
228 return;
229 }
230 msg.append(", COORD_ACTIONS : " + cactions.size());
231 for (CoordinatorActionBean caction : cactions) {
232 try {
233 Services.get().get(InstrumentationService.class).get()
234 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
235 if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
236 queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
237
238 log.info("Recover a WAITTING coord action and resubmit CoordActionInputCheckXCommand :"
239 + caction.getId());
240 }
241 else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) {
242 CoordinatorJobBean coordJob = jpaService
243 .execute(new CoordJobGetJPAExecutor(caction.getJobId()));
244 queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(),
245 coordJob.getAuthToken(), caction.getJobId()));
246
247 log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :"
248 + caction.getId());
249 }
250 else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) {
251 if (caction.getExternalId() != null) {
252 queueCallable(new SuspendXCommand(caction.getExternalId()));
253 log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :"
254 + caction.getId());
255 }
256 }
257 else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) {
258 if (caction.getExternalId() != null) {
259 queueCallable(new KillXCommand(caction.getExternalId()));
260 log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId());
261 }
262 }
263 else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) {
264 if (caction.getExternalId() != null) {
265 queueCallable(new ResumeXCommand(caction.getExternalId()));
266 log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
267 }
268 }
269 }
270 catch (Exception ex) {
271 log.error("Exception, {0}", ex.getMessage(), ex);
272 }
273 }
274
275
276 }
277
278 /**
279 * Recover coordinator actions that are staying in READY too long
280 */
281 private void runCoordActionRecoveryForReady() {
282 XLog.Info.get().clear();
283 XLog log = XLog.getLog(getClass());
284
285 try {
286 List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan));
287 msg.append(", COORD_READY_JOBS : " + jobids.size());
288 for (String jobid : jobids) {
289 queueCallable(new CoordActionReadyXCommand(jobid));
290
291 log.info("Recover READY coord actions for jobid :" + jobid);
292 }
293 }
294 catch (Exception ex) {
295 log.error("Exception, {0}", ex.getMessage(), ex);
296 }
297 }
298
299 /**
300 * Recover wf actions
301 */
302 private void runWFRecovery() {
303 XLog.Info.get().clear();
304 XLog log = XLog.getLog(getClass());
305 // queue command for action recovery
306 List<WorkflowActionBean> actions = null;
307 try {
308 actions = jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(olderThan));
309 }
310 catch (JPAExecutorException ex) {
311 log.warn("Exception while reading pending actions from storage", ex);
312 return;
313 }
314 // log.debug("QUEUING[{0}] pending wf actions for potential recovery",
315 // actions.size());
316 msg.append(" WF_ACTIONS " + actions.size());
317
318 for (WorkflowActionBean action : actions) {
319 try {
320 Services.get().get(InstrumentationService.class).get()
321 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1);
322 if (action.getStatus() == WorkflowActionBean.Status.PREP
323 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
324 queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
325 }
326 else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
327 Date nextRunTime = action.getPendingAge();
328 queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime()
329 - System.currentTimeMillis());
330 }
331 else if (action.getStatus() == WorkflowActionBean.Status.DONE
332 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
333 queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
334 }
335 else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
336 Date nextRunTime = action.getPendingAge();
337 queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime()
338 - System.currentTimeMillis());
339
340 }
341 else if (action.getStatus() == WorkflowActionBean.Status.OK
342 || action.getStatus() == WorkflowActionBean.Status.ERROR) {
343 queueCallable(new SignalXCommand(action.getJobId(), action.getId()));
344 }
345 else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
346 queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
347 }
348 }
349 catch (Exception ex) {
350 log.error("Exception, {0}", ex.getMessage(), ex);
351 }
352 }
353
354 }
355
356 /**
357 * Adds callables to a list. If the number of callables in the list reaches {@link
358 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset.
359 *
360 * @param callable the callable to queue.
361 */
362 private void queueCallable(XCallable<?> callable) {
363 if (callables == null) {
364 callables = new ArrayList<XCallable<?>>();
365 }
366 callables.add(callable);
367 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
368 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
369 if (ret == false) {
370 XLog.getLog(getClass()).warn(
371 "Unable to queue the callables commands for RecoveryService. "
372 + "Most possibly command queue is full. Queue size is :"
373 + Services.get().get(CallableQueueService.class).queueSize());
374 }
375 callables = new ArrayList<XCallable<?>>();
376 }
377 }
378
379 /**
380 * Adds callables to a list. If the number of callables in the list reaches {@link
381 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay
382 * of the callables in the list. The callables list and the delay is reset.
383 *
384 * @param callable the callable to queue.
385 * @param delay the delay for the callable.
386 */
387 private void queueCallable(XCallable<?> callable, long delay) {
388 if (delayedCallables == null) {
389 delayedCallables = new ArrayList<XCallable<?>>();
390 }
391 this.delay = Math.max(this.delay, delay);
392 delayedCallables.add(callable);
393 if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
394 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
395 if (ret == false) {
396 XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. "
397 + "Most possibly Callable queue is full. Queue size is :"
398 + Services.get().get(CallableQueueService.class).queueSize());
399 }
400 delayedCallables = new ArrayList<XCallable<?>>();
401 this.delay = 0;
402 }
403 }
404 }
405
406 /**
407 * Initializes the RecoveryService.
408 *
409 * @param services services instance.
410 */
411 @Override
412 public void init(Services services) {
413 Configuration conf = services.getConf();
414 Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt(
415 CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600));
416 services.get(SchedulerService.class).schedule(recoveryRunnable, 10, conf.getInt(CONF_SERVICE_INTERVAL, 600),
417 SchedulerService.Unit.SEC);
418 }
419
420 /**
421 * Destroy the Recovery Service.
422 */
423 @Override
424 public void destroy() {
425 }
426
427 /**
428 * Return the public interface for the Recovery Service.
429 *
430 * @return {@link RecoveryService}.
431 */
432 @Override
433 public Class<? extends Service> getInterface() {
434 return RecoveryService.class;
435 }
436
437 /**
438 * Merge Bundle job config and the configuration from the coord job to pass
439 * to Coord Engine
440 *
441 * @param coordElem the coordinator configuration
442 * @return Configuration merged configuration
443 * @throws CommandException thrown if failed to merge configuration
444 */
445 private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException {
446 XLog.Info.get().clear();
447 XLog log = XLog.getLog("RecoveryService");
448
449 String jobConf = bundleJob.getConf();
450 // Step 1: runConf = jobConf
451 Configuration runConf = null;
452 try {
453 runConf = new XConfiguration(new StringReader(jobConf));
454 }
455 catch (IOException e1) {
456 log.warn("Configuration parse error in:" + jobConf);
457 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
458 }
459 // Step 2: Merge local properties into runConf
460 // extract 'property' tags under 'configuration' block in the coordElem
461 // convert Element to XConfiguration
462 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
463
464 if (localConfigElement != null) {
465 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
466 Configuration localConf;
467 try {
468 localConf = new XConfiguration(new StringReader(strConfig));
469 }
470 catch (IOException e1) {
471 log.warn("Configuration parse error in:" + strConfig);
472 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
473 }
474
475 // copy configuration properties in the coordElem to the runConf
476 XConfiguration.copy(localConf, runConf);
477 }
478
479 // Step 3: Extract value of 'app-path' in coordElem, save it as a
480 // new property called 'oozie.coord.application.path', and normalize.
481 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
482 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
483 // Normalize coordinator appPath here;
484 try {
485 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
486 }
487 catch (IOException e) {
488 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
489 }
490 return runConf;
491 }
492 }