This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.net.URI;
023 import java.util.Arrays;
024 import java.util.Date;
025 import java.util.List;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.oozie.CoordinatorActionBean;
028 import org.apache.oozie.CoordinatorJobBean;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.XException;
031 import org.apache.oozie.client.CoordinatorAction;
032 import org.apache.oozie.client.Job;
033 import org.apache.oozie.client.OozieClient;
034 import org.apache.oozie.command.CommandException;
035 import org.apache.oozie.command.PreconditionException;
036 import org.apache.oozie.dependency.DependencyChecker;
037 import org.apache.oozie.dependency.ActionDependency;
038 import org.apache.oozie.dependency.URIHandler;
039 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
040 import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
041 import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
042 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
043 import org.apache.oozie.executor.jpa.JPAExecutorException;
044 import org.apache.oozie.service.CallableQueueService;
045 import org.apache.oozie.service.EventHandlerService;
046 import org.apache.oozie.service.JPAService;
047 import org.apache.oozie.service.RecoveryService;
048 import org.apache.oozie.service.Service;
049 import org.apache.oozie.service.Services;
050 import org.apache.oozie.service.URIHandlerService;
051 import org.apache.oozie.util.LogUtils;
052 import org.apache.oozie.util.StatusUtils;
053 import org.apache.oozie.util.XConfiguration;
054 import org.apache.oozie.util.XLog;
055
056 public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> {
057 protected String actionId;
058 protected JPAService jpaService = null;
059 protected CoordinatorActionBean coordAction = null;
060 protected CoordinatorJobBean coordJob = null;
061
062 /**
063 * Property name of command re-queue interval for coordinator push check in
064 * milliseconds.
065 */
066 public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
067 + "coord.push.check.requeue.interval";
068 /**
069 * Default re-queue interval in ms. It is applied when no value defined in
070 * the oozie configuration.
071 */
072 private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000;
073 private boolean registerForNotification;
074 private boolean removeAvailDependencies;
075
076 public CoordPushDependencyCheckXCommand(String actionId) {
077 this(actionId, false, true);
078 }
079
080 public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) {
081 this(actionId, registerForNotification, !registerForNotification);
082 }
083
084 public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification,
085 boolean removeAvailDependencies) {
086 super("coord_push_dep_check", "coord_push_dep_check", 0);
087 this.actionId = actionId;
088 this.registerForNotification = registerForNotification;
089 this.removeAvailDependencies = removeAvailDependencies;
090 }
091
092 protected CoordPushDependencyCheckXCommand(String actionName, String actionId) {
093 super(actionName, actionName, 0);
094 this.actionId = actionId;
095 }
096
097 @Override
098 protected Void execute() throws CommandException {
099 String pushMissingDeps = coordAction.getPushMissingDependencies();
100 if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
101 LOG.info("Nothing to check. Empty push missing dependency");
102 }
103 else {
104 String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
105 LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]);
106 LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps);
107 if (registerForNotification) {
108 LOG.debug("Register for notifications is true");
109 }
110
111 try {
112 Configuration actionConf = null;
113 try {
114 actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
115 }
116 catch (IOException e) {
117 throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
118 }
119
120 // Check all dependencies during materialization to avoid registering in the cache.
121 // But check only first missing one afterwards similar to
122 // CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
123 ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf,
124 !registerForNotification);
125
126 boolean isChangeInDependency = true;
127 boolean timeout = false;
128 if (actionDep.getMissingDependencies().size() == 0) {
129 // All push-based dependencies are available
130 onAllPushDependenciesAvailable();
131 }
132 else {
133 if (actionDep.getMissingDependencies().size() == missingDepsArray.length) {
134 isChangeInDependency = false;
135 }
136 else {
137 String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep
138 .getMissingDependencies());
139 coordAction.setPushMissingDependencies(stillMissingDeps);
140 }
141 // Checking for timeout
142 timeout = isTimeout();
143 if (timeout) {
144 queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
145 }
146 else {
147 queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
148 getCoordPushCheckRequeueInterval());
149 }
150 }
151
152 updateCoordAction(coordAction, isChangeInDependency);
153 if (registerForNotification) {
154 registerForNotification(actionDep.getMissingDependencies(), actionConf);
155 }
156 if (removeAvailDependencies) {
157 unregisterAvailableDependencies(actionDep.getAvailableDependencies());
158 }
159 if (timeout) {
160 unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId);
161 }
162 }
163 catch (Exception e) {
164 final CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
165 if (isTimeout()) {
166 LOG.debug("Queueing timeout command");
167 // XCommand.queue() will not work when there is a Exception
168 callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
169 unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId);
170 }
171 else if (coordAction.getMissingDependencies() != null
172 && coordAction.getMissingDependencies().length() > 0) {
173 // Queue again on exception as RecoveryService will not queue this again with
174 // the action being updated regularly by CoordActionInputCheckXCommand
175 callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(),
176 registerForNotification, removeAvailDependencies),
177 Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
178 }
179 throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
180 }
181 }
182 return null;
183 }
184
185 /**
186 * Return the re-queue interval for coord push dependency check
187 * @return
188 */
189 public long getCoordPushCheckRequeueInterval() {
190 long requeueInterval = Services.get().getConf().getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL,
191 DEFAULT_COMMAND_REQUEUE_INTERVAL);
192 return requeueInterval;
193 }
194
195 /**
196 * Returns true if timeout period has been reached
197 *
198 * @return true if it is time for timeout else false
199 */
200 protected boolean isTimeout() {
201 long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
202 .getCreatedTime().getTime()))
203 / (60 * 1000);
204 int timeOut = coordAction.getTimeOut();
205 return (timeOut >= 0) && (waitingTime > timeOut);
206 }
207
208 protected void onAllPushDependenciesAvailable() throws CommandException {
209 coordAction.setPushMissingDependencies("");
210 if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
211 Date nominalTime = coordAction.getNominalTime();
212 Date currentTime = new Date();
213 // The action should become READY only if current time > nominal time;
214 // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time.
215 if (nominalTime.compareTo(currentTime) > 0) {
216 LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current="
217 + currentTime + ", nominal=" + nominalTime);
218 }
219 else {
220 String actionXml = resolveCoordConfiguration();
221 coordAction.setActionXml(actionXml);
222 coordAction.setStatus(CoordinatorAction.Status.READY);
223 // pass jobID to the CoordActionReadyXCommand
224 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
225 }
226 }
227 else if (isTimeout()) {
228 // If it is timeout and all push dependencies are available but still some unresolved
229 // missing dependencies queue CoordActionInputCheckXCommand now. Else it will have to
230 // wait till RecoveryService kicks in
231 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()));
232 }
233 }
234
235 private String resolveCoordConfiguration() throws CommandException {
236 try {
237 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
238 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
239 String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
240 actionId);
241 actionXml.replace(0, actionXml.length(), newActionXml);
242 return actionXml.toString();
243 }
244 catch (Exception e) {
245 throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
246 }
247 }
248
249 protected void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
250 throws CommandException {
251 coordAction.setLastModifiedTime(new Date());
252 if (jpaService != null) {
253 try {
254 if (isChangeInDependency) {
255 jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
256 if (EventHandlerService.isEnabled()
257 && coordAction.getStatus() != CoordinatorAction.Status.READY) {
258 //since event is not to be generated unless action RUNNING via StartX
259 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
260 }
261 }
262 else {
263 jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
264 }
265 }
266 catch (JPAExecutorException jex) {
267 throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
268 }
269 }
270 }
271
272 private void registerForNotification(List<String> missingDeps, Configuration actionConf) {
273 URIHandlerService uriService = Services.get().get(URIHandlerService.class);
274 String user = actionConf.get(OozieClient.USER_NAME, OozieClient.USER_NAME);
275 for (String missingDep : missingDeps) {
276 try {
277 URI missingURI = new URI(missingDep);
278 URIHandler handler = uriService.getURIHandler(missingURI);
279 handler.registerForNotification(missingURI, actionConf, user, actionId);
280 LOG.debug("Registered uri [{0}] for notifications", missingURI);
281 }
282 catch (Exception e) {
283 LOG.warn("Exception while registering uri [{0}] for notifications", missingDep, e);
284 }
285 }
286 }
287
288 private void unregisterAvailableDependencies(List<String> availableDeps) {
289 URIHandlerService uriService = Services.get().get(URIHandlerService.class);
290 for (String availableDep : availableDeps) {
291 try {
292 URI availableURI = new URI(availableDep);
293 URIHandler handler = uriService.getURIHandler(availableURI);
294 if (handler.unregisterFromNotification(availableURI, actionId)) {
295 LOG.debug("Successfully unregistered uri [{0}] from notifications", availableURI);
296 }
297 else {
298 LOG.warn("Unable to unregister uri [{0}] from notifications", availableURI);
299 }
300 }
301 catch (Exception e) {
302 LOG.warn("Exception while unregistering uri [{0}] from notifications", availableDep, e);
303 }
304 }
305 }
306
307 public static void unregisterMissingDependencies(List<String> missingDeps, String actionId) {
308 final XLog LOG = XLog.getLog(CoordPushDependencyCheckXCommand.class);
309 URIHandlerService uriService = Services.get().get(URIHandlerService.class);
310 for (String missingDep : missingDeps) {
311 try {
312 URI missingURI = new URI(missingDep);
313 URIHandler handler = uriService.getURIHandler(missingURI);
314 if (handler.unregisterFromNotification(missingURI, actionId)) {
315 LOG.debug("Successfully unregistered uri [{0}] from notifications", missingURI);
316 }
317 else {
318 LOG.warn("Unable to unregister uri [{0}] from notifications", missingURI);
319 }
320 }
321 catch (Exception e) {
322 LOG.warn("Exception while unregistering uri [{0}] from notifications", missingDep, e);
323 }
324 }
325 }
326
327 @Override
328 public String getEntityKey() {
329 return coordAction.getJobId();
330 }
331
332 @Override
333 public String getKey(){
334 return getName() + "_" + actionId;
335 }
336
337 @Override
338 protected boolean isLockRequired() {
339 return true;
340 }
341
342 @Override
343 protected void eagerLoadState() throws CommandException {
344 try {
345 jpaService = Services.get().get(JPAService.class);
346
347 if (jpaService != null) {
348 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
349 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
350 LogUtils.setLogInfo(coordAction, logInfo);
351 }
352 else {
353 throw new CommandException(ErrorCode.E0610);
354 }
355 }
356 catch (XException ex) {
357 throw new CommandException(ex);
358 }
359 }
360
361 @Override
362 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
363 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
364 throw new PreconditionException(ErrorCode.E1100, "[" + actionId
365 + "]::CoordPushDependencyCheck:: Ignoring action. Should be in WAITING state, but state="
366 + coordAction.getStatus());
367 }
368
369 // if eligible to do action input check when running with backward
370 // support is true
371 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
372 return;
373 }
374
375 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR
376 && coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
377 throw new PreconditionException(ErrorCode.E1100, "[" + actionId
378 + "]::CoordPushDependencyCheck:: Ignoring action."
379 + " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
380 + coordJob.getStatus());
381 }
382 }
383
384 @Override
385 protected void loadState() throws CommandException {
386 eagerLoadState();
387 }
388
389 @Override
390 protected void verifyPrecondition() throws CommandException, PreconditionException {
391 eagerVerifyPrecondition();
392 }
393
394 }