001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.net.URI;
023    import java.util.Arrays;
024    import java.util.Date;
025    import java.util.List;
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.oozie.CoordinatorActionBean;
028    import org.apache.oozie.CoordinatorJobBean;
029    import org.apache.oozie.ErrorCode;
030    import org.apache.oozie.XException;
031    import org.apache.oozie.client.CoordinatorAction;
032    import org.apache.oozie.client.Job;
033    import org.apache.oozie.client.OozieClient;
034    import org.apache.oozie.command.CommandException;
035    import org.apache.oozie.command.PreconditionException;
036    import org.apache.oozie.dependency.DependencyChecker;
037    import org.apache.oozie.dependency.ActionDependency;
038    import org.apache.oozie.dependency.URIHandler;
039    import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
040    import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
041    import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
042    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
043    import org.apache.oozie.executor.jpa.JPAExecutorException;
044    import org.apache.oozie.service.CallableQueueService;
045    import org.apache.oozie.service.EventHandlerService;
046    import org.apache.oozie.service.JPAService;
047    import org.apache.oozie.service.RecoveryService;
048    import org.apache.oozie.service.Service;
049    import org.apache.oozie.service.Services;
050    import org.apache.oozie.service.URIHandlerService;
051    import org.apache.oozie.util.LogUtils;
052    import org.apache.oozie.util.StatusUtils;
053    import org.apache.oozie.util.XConfiguration;
054    import org.apache.oozie.util.XLog;
055    
056    public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> {
057        protected String actionId;
058        protected JPAService jpaService = null;
059        protected CoordinatorActionBean coordAction = null;
060        protected CoordinatorJobBean coordJob = null;
061    
062        /**
063         * Property name of command re-queue interval for coordinator push check in
064         * milliseconds.
065         */
066        public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
067                + "coord.push.check.requeue.interval";
068        /**
069         * Default re-queue interval in ms. It is applied when no value defined in
070         * the oozie configuration.
071         */
072        private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000;
073        private boolean registerForNotification;
074        private boolean removeAvailDependencies;
075    
076        public CoordPushDependencyCheckXCommand(String actionId) {
077            this(actionId, false, true);
078        }
079    
080        public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) {
081            this(actionId, registerForNotification, !registerForNotification);
082        }
083    
084        public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification,
085                boolean removeAvailDependencies) {
086            super("coord_push_dep_check", "coord_push_dep_check", 0);
087            this.actionId = actionId;
088            this.registerForNotification = registerForNotification;
089            this.removeAvailDependencies = removeAvailDependencies;
090        }
091    
092        protected CoordPushDependencyCheckXCommand(String actionName, String actionId) {
093            super(actionName, actionName, 0);
094            this.actionId = actionId;
095        }
096    
097        @Override
098        protected Void execute() throws CommandException {
099            String pushMissingDeps = coordAction.getPushMissingDependencies();
100            if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
101                LOG.info("Nothing to check. Empty push missing dependency");
102            }
103            else {
104                String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
105                LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]);
106                LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps);
107                if (registerForNotification) {
108                    LOG.debug("Register for notifications is true");
109                }
110    
111                try {
112                    Configuration actionConf = null;
113                    try {
114                        actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
115                    }
116                    catch (IOException e) {
117                        throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
118                    }
119    
120                    // Check all dependencies during materialization to avoid registering in the cache.
121                    // But check only first missing one afterwards similar to
122                    // CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
123                    ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf,
124                            !registerForNotification);
125    
126                    boolean isChangeInDependency = true;
127                    boolean timeout = false;
128                    if (actionDep.getMissingDependencies().size() == 0) {
129                        // All push-based dependencies are available
130                        onAllPushDependenciesAvailable();
131                    }
132                    else {
133                        if (actionDep.getMissingDependencies().size() == missingDepsArray.length) {
134                            isChangeInDependency = false;
135                        }
136                        else {
137                            String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep
138                                    .getMissingDependencies());
139                            coordAction.setPushMissingDependencies(stillMissingDeps);
140                        }
141                        // Checking for timeout
142                        timeout = isTimeout();
143                        if (timeout) {
144                            queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
145                        }
146                        else {
147                            queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
148                                    getCoordPushCheckRequeueInterval());
149                        }
150                    }
151    
152                    updateCoordAction(coordAction, isChangeInDependency);
153                    if (registerForNotification) {
154                        registerForNotification(actionDep.getMissingDependencies(), actionConf);
155                    }
156                    if (removeAvailDependencies) {
157                        unregisterAvailableDependencies(actionDep.getAvailableDependencies());
158                    }
159                    if (timeout) {
160                        unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId);
161                    }
162                }
163                catch (Exception e) {
164                    final CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
165                    if (isTimeout()) {
166                        LOG.debug("Queueing timeout command");
167                        // XCommand.queue() will not work when there is a Exception
168                        callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
169                        unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId);
170                    }
171                    else if (coordAction.getMissingDependencies() != null
172                            && coordAction.getMissingDependencies().length() > 0) {
173                        // Queue again on exception as RecoveryService will not queue this again with
174                        // the action being updated regularly by CoordActionInputCheckXCommand
175                        callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(),
176                                registerForNotification, removeAvailDependencies),
177                                Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
178                    }
179                    throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
180                }
181            }
182            return null;
183        }
184    
185        /**
186         * Return the re-queue interval for coord push dependency check
187         * @return
188         */
189        public long getCoordPushCheckRequeueInterval() {
190            long requeueInterval = Services.get().getConf().getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL,
191                    DEFAULT_COMMAND_REQUEUE_INTERVAL);
192            return requeueInterval;
193        }
194    
195        /**
196         * Returns true if timeout period has been reached
197         *
198         * @return true if it is time for timeout else false
199         */
200        protected boolean isTimeout() {
201            long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
202                    .getCreatedTime().getTime()))
203                    / (60 * 1000);
204            int timeOut = coordAction.getTimeOut();
205            return (timeOut >= 0) && (waitingTime > timeOut);
206        }
207    
208        protected void onAllPushDependenciesAvailable() throws CommandException {
209            coordAction.setPushMissingDependencies("");
210            if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
211                Date nominalTime = coordAction.getNominalTime();
212                Date currentTime = new Date();
213                // The action should become READY only if current time > nominal time;
214                // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time.
215                if (nominalTime.compareTo(currentTime) > 0) {
216                    LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current="
217                            + currentTime + ", nominal=" + nominalTime);
218                }
219                else {
220                    String actionXml = resolveCoordConfiguration();
221                    coordAction.setActionXml(actionXml);
222                    coordAction.setStatus(CoordinatorAction.Status.READY);
223                    // pass jobID to the CoordActionReadyXCommand
224                    queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
225                }
226            }
227            else if (isTimeout()) {
228                // If it is timeout and all push dependencies are available but still some unresolved
229                // missing dependencies queue CoordActionInputCheckXCommand now. Else it will have to
230                // wait till RecoveryService kicks in
231                queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()));
232            }
233        }
234    
235        private String resolveCoordConfiguration() throws CommandException {
236            try {
237                Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
238                StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
239                String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
240                        actionId);
241                actionXml.replace(0, actionXml.length(), newActionXml);
242                return actionXml.toString();
243            }
244            catch (Exception e) {
245                throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
246            }
247        }
248    
249        protected void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
250                throws CommandException {
251            coordAction.setLastModifiedTime(new Date());
252            if (jpaService != null) {
253                try {
254                    if (isChangeInDependency) {
255                        jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
256                        if (EventHandlerService.isEnabled()
257                                && coordAction.getStatus() != CoordinatorAction.Status.READY) {
258                            //since event is not to be generated unless action RUNNING via StartX
259                            generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
260                        }
261                    }
262                    else {
263                        jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
264                    }
265                }
266                catch (JPAExecutorException jex) {
267                    throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
268                }
269            }
270        }
271    
272        private void registerForNotification(List<String> missingDeps, Configuration actionConf) {
273            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
274            String user = actionConf.get(OozieClient.USER_NAME, OozieClient.USER_NAME);
275            for (String missingDep : missingDeps) {
276                try {
277                    URI missingURI = new URI(missingDep);
278                    URIHandler handler = uriService.getURIHandler(missingURI);
279                    handler.registerForNotification(missingURI, actionConf, user, actionId);
280                        LOG.debug("Registered uri [{0}] for notifications", missingURI);
281                }
282                catch (Exception e) {
283                    LOG.warn("Exception while registering uri [{0}] for notifications", missingDep, e);
284                }
285            }
286        }
287    
288        private void unregisterAvailableDependencies(List<String> availableDeps) {
289            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
290            for (String availableDep : availableDeps) {
291                try {
292                    URI availableURI = new URI(availableDep);
293                    URIHandler handler = uriService.getURIHandler(availableURI);
294                    if (handler.unregisterFromNotification(availableURI, actionId)) {
295                        LOG.debug("Successfully unregistered uri [{0}] from notifications", availableURI);
296                    }
297                    else {
298                        LOG.warn("Unable to unregister uri [{0}] from notifications", availableURI);
299                    }
300                }
301                catch (Exception e) {
302                    LOG.warn("Exception while unregistering uri [{0}] from notifications", availableDep, e);
303                }
304            }
305        }
306    
307        public static void unregisterMissingDependencies(List<String> missingDeps, String actionId) {
308            final XLog LOG = XLog.getLog(CoordPushDependencyCheckXCommand.class);
309            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
310            for (String missingDep : missingDeps) {
311                try {
312                    URI missingURI = new URI(missingDep);
313                    URIHandler handler = uriService.getURIHandler(missingURI);
314                    if (handler.unregisterFromNotification(missingURI, actionId)) {
315                        LOG.debug("Successfully unregistered uri [{0}] from notifications", missingURI);
316                    }
317                    else {
318                        LOG.warn("Unable to unregister uri [{0}] from notifications", missingURI);
319                    }
320                }
321                catch (Exception e) {
322                    LOG.warn("Exception while unregistering uri [{0}] from notifications", missingDep, e);
323                }
324            }
325        }
326    
327        @Override
328        public String getEntityKey() {
329            return coordAction.getJobId();
330        }
331    
332        @Override
333        public String getKey(){
334            return getName() + "_" + actionId;
335        }
336    
337        @Override
338        protected boolean isLockRequired() {
339            return true;
340        }
341    
342        @Override
343        protected void eagerLoadState() throws CommandException {
344            try {
345                jpaService = Services.get().get(JPAService.class);
346    
347                if (jpaService != null) {
348                    coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
349                    coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
350                    LogUtils.setLogInfo(coordAction, logInfo);
351                }
352                else {
353                    throw new CommandException(ErrorCode.E0610);
354                }
355            }
356            catch (XException ex) {
357                throw new CommandException(ex);
358            }
359        }
360    
361        @Override
362        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
363            if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
364                throw new PreconditionException(ErrorCode.E1100, "[" + actionId
365                        + "]::CoordPushDependencyCheck:: Ignoring action. Should be in WAITING state, but state="
366                        + coordAction.getStatus());
367            }
368    
369            // if eligible to do action input check when running with backward
370            // support is true
371            if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
372                return;
373            }
374    
375            if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR
376                    && coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
377                throw new PreconditionException(ErrorCode.E1100, "[" + actionId
378                        + "]::CoordPushDependencyCheck:: Ignoring action."
379                        + " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
380                        + coordJob.getStatus());
381            }
382        }
383    
384        @Override
385        protected void loadState() throws CommandException {
386            eagerLoadState();
387        }
388    
389        @Override
390        protected void verifyPrecondition() throws CommandException, PreconditionException {
391            eagerVerifyPrecondition();
392        }
393    
394    }