001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.net.URI;
023import java.util.Arrays;
024import java.util.Date;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.oozie.CoordinatorActionBean;
029import org.apache.oozie.CoordinatorJobBean;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.client.CoordinatorAction;
032import org.apache.oozie.client.Job;
033import org.apache.oozie.client.OozieClient;
034import org.apache.oozie.command.CommandException;
035import org.apache.oozie.command.PreconditionException;
036import org.apache.oozie.dependency.DependencyChecker;
037import org.apache.oozie.dependency.ActionDependency;
038import org.apache.oozie.dependency.URIHandler;
039import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
040import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
041import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
042import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
043import org.apache.oozie.executor.jpa.JPAExecutorException;
044import org.apache.oozie.service.CallableQueueService;
045import org.apache.oozie.service.EventHandlerService;
046import org.apache.oozie.service.JPAService;
047import org.apache.oozie.service.PartitionDependencyManagerService;
048import org.apache.oozie.service.RecoveryService;
049import org.apache.oozie.service.Service;
050import org.apache.oozie.service.Services;
051import org.apache.oozie.service.URIHandlerService;
052import org.apache.oozie.util.LogUtils;
053import org.apache.oozie.util.StatusUtils;
054import org.apache.oozie.util.XConfiguration;
055import org.apache.oozie.util.XLog;
056import org.apache.oozie.util.DateUtils;
057
058public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> {
059    protected String actionId;
060    protected JPAService jpaService = null;
061    protected CoordinatorActionBean coordAction = null;
062    protected CoordinatorJobBean coordJob = null;
063
064    /**
065     * Property name of command re-queue interval for coordinator push check in
066     * milliseconds.
067     */
068    public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
069            + "coord.push.check.requeue.interval";
070    /**
071     * Default re-queue interval in ms. It is applied when no value defined in
072     * the oozie configuration.
073     */
074    private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000;
075    private boolean registerForNotification;
076    private boolean removeAvailDependencies;
077
078    public CoordPushDependencyCheckXCommand(String actionId) {
079        this(actionId, false, true);
080    }
081
082    public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) {
083        this(actionId, registerForNotification, !registerForNotification);
084    }
085
086    public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification,
087            boolean removeAvailDependencies) {
088        super("coord_push_dep_check", "coord_push_dep_check", 0);
089        this.actionId = actionId;
090        this.registerForNotification = registerForNotification;
091        this.removeAvailDependencies = removeAvailDependencies;
092    }
093
094    protected CoordPushDependencyCheckXCommand(String actionName, String actionId) {
095        super(actionName, actionName, 0);
096        this.actionId = actionId;
097    }
098
099    @Override
100    protected Void execute() throws CommandException {
101        String pushMissingDeps = coordAction.getPushMissingDependencies();
102        if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
103            LOG.info("Nothing to check. Empty push missing dependency");
104        }
105        else {
106            String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
107            LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]);
108            LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps);
109            if (registerForNotification) {
110                LOG.debug("Register for notifications is true");
111            }
112
113            try {
114                Configuration actionConf = null;
115                try {
116                    actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
117                }
118                catch (IOException e) {
119                    throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
120                }
121
122                // Check all dependencies during materialization to avoid registering in the cache.
123                // But check only first missing one afterwards similar to
124                // CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
125                ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf,
126                        !registerForNotification);
127
128                boolean isChangeInDependency = true;
129                boolean timeout = false;
130                if (actionDep.getMissingDependencies().size() == 0) {
131                    // All push-based dependencies are available
132                    onAllPushDependenciesAvailable();
133                }
134                else {
135                    if (actionDep.getMissingDependencies().size() == missingDepsArray.length) {
136                        isChangeInDependency = false;
137                    }
138                    else {
139                        String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep
140                                .getMissingDependencies());
141                        coordAction.setPushMissingDependencies(stillMissingDeps);
142                    }
143                    // Checking for timeout
144                    timeout = isTimeout();
145                    if (timeout) {
146                        queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
147                    }
148                    else {
149                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
150                                getCoordPushCheckRequeueInterval());
151                    }
152                }
153
154                updateCoordAction(coordAction, isChangeInDependency);
155                if (registerForNotification) {
156                    registerForNotification(actionDep.getMissingDependencies(), actionConf);
157                }
158                if (removeAvailDependencies) {
159                    unregisterAvailableDependencies(actionDep.getAvailableDependencies());
160                }
161                if (timeout) {
162                    unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId);
163                }
164            }
165            catch (Exception e) {
166                final CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
167                if (isTimeout()) {
168                    LOG.debug("Queueing timeout command");
169                    // XCommand.queue() will not work when there is a Exception
170                    callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
171                    unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId);
172                }
173                else if (coordAction.getMissingDependencies() != null
174                        && coordAction.getMissingDependencies().length() > 0) {
175                    // Queue again on exception as RecoveryService will not queue this again with
176                    // the action being updated regularly by CoordActionInputCheckXCommand
177                    callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(),
178                            registerForNotification, removeAvailDependencies),
179                            Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
180                }
181                throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
182            }
183        }
184        return null;
185    }
186
187    /**
188     * Return the re-queue interval for coord push dependency check
189     * @return
190     */
191    public long getCoordPushCheckRequeueInterval() {
192        long requeueInterval = Services.get().getConf().getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL,
193                DEFAULT_COMMAND_REQUEUE_INTERVAL);
194        return requeueInterval;
195    }
196
197    /**
198     * Returns true if timeout period has been reached
199     *
200     * @return true if it is time for timeout else false
201     */
202    protected boolean isTimeout() {
203        long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
204                .getCreatedTime().getTime()))
205                / (60 * 1000);
206        int timeOut = coordAction.getTimeOut();
207        return (timeOut >= 0) && (waitingTime > timeOut);
208    }
209
210    protected void onAllPushDependenciesAvailable() throws CommandException {
211        coordAction.setPushMissingDependencies("");
212        Services.get().get(PartitionDependencyManagerService.class)
213                .removeCoordActionWithDependenciesAvailable(coordAction.getId());
214        if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
215            Date nominalTime = coordAction.getNominalTime();
216            Date currentTime = new Date();
217            // The action should become READY only if current time > nominal time;
218            // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time.
219            if (nominalTime.compareTo(currentTime) > 0) {
220                LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current="
221                        + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
222            }
223            else {
224                String actionXml = resolveCoordConfiguration();
225                coordAction.setActionXml(actionXml);
226                coordAction.setStatus(CoordinatorAction.Status.READY);
227                // pass jobID to the CoordActionReadyXCommand
228                queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
229            }
230        }
231        else if (isTimeout()) {
232            // If it is timeout and all push dependencies are available but still some unresolved
233            // missing dependencies queue CoordActionInputCheckXCommand now. Else it will have to
234            // wait till RecoveryService kicks in
235            queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()));
236        }
237    }
238
239    private String resolveCoordConfiguration() throws CommandException {
240        try {
241            Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
242            StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
243            String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
244                    actionId);
245            actionXml.replace(0, actionXml.length(), newActionXml);
246            return actionXml.toString();
247        }
248        catch (Exception e) {
249            throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
250        }
251    }
252
253    protected void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
254            throws CommandException {
255        coordAction.setLastModifiedTime(new Date());
256        if (jpaService != null) {
257            try {
258                if (isChangeInDependency) {
259                    CoordActionQueryExecutor.getInstance().executeUpdate(
260                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, coordAction);
261                    if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
262                        // since event is not to be generated unless action
263                        // RUNNING via StartX
264                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
265                    }
266                }
267                else {
268                    CoordActionQueryExecutor.getInstance().executeUpdate(
269                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
270                }
271            }
272            catch (JPAExecutorException jex) {
273                throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
274            }
275        }
276    }
277
278    private void registerForNotification(List<String> missingDeps, Configuration actionConf) {
279        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
280        String user = actionConf.get(OozieClient.USER_NAME, OozieClient.USER_NAME);
281        for (String missingDep : missingDeps) {
282            try {
283                URI missingURI = new URI(missingDep);
284                URIHandler handler = uriService.getURIHandler(missingURI);
285                handler.registerForNotification(missingURI, actionConf, user, actionId);
286                    LOG.debug("Registered uri [{0}] for notifications", missingURI);
287            }
288            catch (Exception e) {
289                LOG.warn("Exception while registering uri [{0}] for notifications", missingDep, e);
290            }
291        }
292    }
293
294    private void unregisterAvailableDependencies(List<String> availableDeps) {
295        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
296        for (String availableDep : availableDeps) {
297            try {
298                URI availableURI = new URI(availableDep);
299                URIHandler handler = uriService.getURIHandler(availableURI);
300                if (handler.unregisterFromNotification(availableURI, actionId)) {
301                    LOG.debug("Successfully unregistered uri [{0}] from notifications", availableURI);
302                }
303                else {
304                    LOG.warn("Unable to unregister uri [{0}] from notifications", availableURI);
305                }
306            }
307            catch (Exception e) {
308                LOG.warn("Exception while unregistering uri [{0}] from notifications", availableDep, e);
309            }
310        }
311    }
312
313    public static void unregisterMissingDependencies(List<String> missingDeps, String actionId) {
314        final XLog LOG = XLog.getLog(CoordPushDependencyCheckXCommand.class);
315        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
316        for (String missingDep : missingDeps) {
317            try {
318                URI missingURI = new URI(missingDep);
319                URIHandler handler = uriService.getURIHandler(missingURI);
320                if (handler.unregisterFromNotification(missingURI, actionId)) {
321                    LOG.debug("Successfully unregistered uri [{0}] from notifications", missingURI);
322                }
323                else {
324                    LOG.warn("Unable to unregister uri [{0}] from notifications", missingURI);
325                }
326            }
327            catch (Exception e) {
328                LOG.warn("Exception while unregistering uri [{0}] from notifications", missingDep, e);
329            }
330        }
331    }
332
333    @Override
334    public String getEntityKey() {
335        return actionId.substring(0, actionId.indexOf("@"));
336    }
337
338    @Override
339    public String getKey(){
340        return getName() + "_" + actionId;
341    }
342
343    @Override
344    protected boolean isLockRequired() {
345        return true;
346    }
347
348    @Override
349    protected void loadState() throws CommandException {
350        jpaService = Services.get().get(JPAService.class);
351        try {
352            coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
353            if (coordAction != null) {
354                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
355                LogUtils.setLogInfo(coordAction, logInfo);
356            }
357            else {
358                throw new CommandException(ErrorCode.E0605, actionId);
359            }
360        }
361        catch (JPAExecutorException je) {
362            throw new CommandException(je);
363        }
364    }
365
366    @Override
367    protected void verifyPrecondition() throws CommandException, PreconditionException {
368        if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
369            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
370                    + "]::CoordPushDependencyCheck:: Ignoring action. Should be in WAITING state, but state="
371                    + coordAction.getStatus());
372        }
373
374        // if eligible to do action input check when running with backward
375        // support is true
376        if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
377            return;
378        }
379
380        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR
381                && coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
382            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
383                    + "]::CoordPushDependencyCheck:: Ignoring action."
384                    + " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
385                    + coordJob.getStatus());
386        }
387    }
388
389}