001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.bundle;
020
021import java.util.Date;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.BundleActionBean;
028import org.apache.oozie.BundleJobBean;
029import org.apache.oozie.CoordinatorJobBean;
030import org.apache.oozie.XException;
031import org.apache.oozie.client.Job;
032import org.apache.oozie.client.rest.RestConstants;
033import org.apache.oozie.command.CommandException;
034import org.apache.oozie.command.RerunTransitionXCommand;
035import org.apache.oozie.command.coord.CoordRerunXCommand;
036import org.apache.oozie.executor.jpa.BatchQueryExecutor;
037import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
038import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
039import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
040import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
041import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
042import org.apache.oozie.executor.jpa.JPAExecutorException;
043import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
044import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
045import org.apache.oozie.util.DateUtils;
046import org.apache.oozie.util.LogUtils;
047import org.apache.oozie.util.ParamChecker;
048import org.apache.oozie.util.XLog;
049
050/**
051 * Rerun bundle coordinator jobs by a list of coordinator names or dates. User can specify if refresh or noCleanup.
052 * <p>
053 * The "refresh" is used to indicate if user wants to refresh an action's input/outpur dataset urls
054 * <p>
055 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
056 */
057public class BundleRerunXCommand extends RerunTransitionXCommand<Void> {
058
059    private final String coordScope;
060    private final String dateScope;
061    private final boolean refresh;
062    private final boolean noCleanup;
063    private BundleJobBean bundleJob;
064    private List<BundleActionBean> bundleActions;
065    protected boolean prevPending;
066
067    /**
068     * The constructor for class {@link BundleRerunXCommand}
069     *
070     * @param jobId the bundle job id
071     * @param coordScope the rerun scope for coordinator job names separated by ","
072     * @param dateScope the rerun scope for coordinator nominal times separated by ","
073     * @param refresh true if user wants to refresh input/outpur dataset urls
074     * @param noCleanup false if user wants to cleanup output events for given rerun actions
075     */
076    public BundleRerunXCommand(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
077        super("bundle_rerun", "bundle_rerun", 1);
078        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
079        this.coordScope = coordScope;
080        this.dateScope = dateScope;
081        this.refresh = refresh;
082        this.noCleanup = noCleanup;
083    }
084
085    /* (non-Javadoc)
086     * @see org.apache.oozie.command.XCommand#loadState()
087     */
088    @Override
089    protected void loadState() throws CommandException {
090        try {
091            this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId);
092            this.bundleActions = BundleActionQueryExecutor.getInstance().getList(
093                    BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId);
094            LogUtils.setLogInfo(bundleJob);
095            super.setJob(bundleJob);
096            prevPending = bundleJob.isPending();
097        }
098        catch (XException ex) {
099            throw new CommandException(ex);
100        }
101
102    }
103
104    /* (non-Javadoc)
105     * @see org.apache.oozie.command.RerunTransitionXCommand#rerunChildren()
106     */
107    @Override
108    public void rerunChildren() throws CommandException {
109        boolean isUpdateActionDone = false;
110        Map<String, BundleActionBean> coordNameToBAMapping = new HashMap<String, BundleActionBean>();
111        if (bundleActions != null) {
112            for (BundleActionBean action : bundleActions) {
113                if (action.getCoordName() != null) {
114                    coordNameToBAMapping.put(action.getCoordName(), action);
115                }
116            }
117        }
118
119        if (coordScope != null && !coordScope.isEmpty()) {
120            String[] list = coordScope.split(",");
121            for (String coordName : list) {
122                coordName = coordName.trim();
123                if (coordNameToBAMapping.keySet().contains(coordName)) {
124                    String coordId = coordNameToBAMapping.get(coordName).getCoordId();
125                    if (coordId == null) {
126                        LOG.info("No coord id found. Therefore, nothing to queue for coord rerun for coordname: " + coordName);
127                        continue;
128                    }
129                    CoordinatorJobBean coordJob = getCoordJob(coordId);
130
131                    String rerunDateScope;
132                    if (dateScope != null && !dateScope.isEmpty()) {
133                        rerunDateScope = dateScope;
134                    }
135                    else {
136                        String coordStart = DateUtils.formatDateOozieTZ(coordJob.getStartTime());
137                        String coordEnd = DateUtils.formatDateOozieTZ(coordJob.getEndTime());
138                        rerunDateScope = coordStart + "::" + coordEnd;
139                    }
140                    LOG.debug("Queuing rerun range [" + rerunDateScope + "] for coord id " + coordId + " of bundle "
141                            + bundleJob.getId());
142                    queue(new CoordRerunXCommand(coordId, RestConstants.JOB_COORD_SCOPE_DATE, rerunDateScope, refresh,
143                            noCleanup, false, null));
144                    updateBundleAction(coordNameToBAMapping.get(coordName));
145                    isUpdateActionDone = true;
146                }
147                else {
148                    LOG.info("Rerun for coord " + coordName + " NOT performed because it is not in bundle ", bundleJob.getId());
149                }
150            }
151        }
152        else if (dateScope != null && !dateScope.isEmpty()) {
153            if (bundleActions != null) {
154                for (BundleActionBean action : bundleActions) {
155                    if (action.getCoordId() == null) {
156                        LOG.info("No coord id found. Therefore nothing to queue for coord rerun with coord name "
157                                + action.getCoordName());
158                        continue;
159                    }
160                    LOG.debug("Queuing rerun range [" + dateScope + "] for coord id " + action.getCoordId() + " of bundle "
161                            + bundleJob.getId());
162                    queue(new CoordRerunXCommand(action.getCoordId(), RestConstants.JOB_COORD_SCOPE_DATE, dateScope,
163                            refresh, noCleanup, false, null));
164                    updateBundleAction(action);
165                    isUpdateActionDone = true;
166                }
167            }
168        }
169        if (!isUpdateActionDone) {
170            transitToPrevious();
171        }
172        LOG.info("Rerun coord jobs for the bundle=[{0}]", jobId);
173    }
174
175    private final void transitToPrevious() throws CommandException {
176        bundleJob.setStatus(getPrevStatus());
177        if (!prevPending) {
178            bundleJob.resetPending();
179        }
180        else {
181            bundleJob.setPending();
182        }
183        updateJob();
184    }
185
186    /**
187     * Update bundle action
188     *
189     * @param action the bundle action
190     * @throws CommandException thrown if failed to update bundle action
191     */
192    private void updateBundleAction(BundleActionBean action) {
193        action.incrementAndGetPending();
194        action.setLastModifiedTime(new Date());
195        updateList.add(new UpdateEntry<BundleActionQuery>(BundleActionQuery.UPDATE_BUNDLE_ACTION_PENDING_MODTIME, action));
196    }
197
198    /* (non-Javadoc)
199     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
200     */
201    @Override
202    public void updateJob() {
203        // rerun a paused bundle job will keep job status at paused and pending at previous pending
204        if (getPrevStatus() != null) {
205            Job.Status bundleJobStatus = getPrevStatus();
206            if (bundleJobStatus.equals(Job.Status.PAUSED) || bundleJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
207                bundleJob.setStatus(bundleJobStatus);
208                if (prevPending) {
209                    bundleJob.setPending();
210                }
211                else {
212                    bundleJob.resetPending();
213                }
214            }
215        }
216        updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob));
217    }
218
219    /* (non-Javadoc)
220     * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
221     */
222    @Override
223    public void performWrites() throws CommandException {
224        try {
225            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
226        }
227        catch (JPAExecutorException e) {
228            throw new CommandException(e);
229        }
230    }
231
232    /* (non-Javadoc)
233     * @see org.apache.oozie.command.XCommand#getEntityKey()
234     */
235    @Override
236    public String getEntityKey() {
237        return jobId;
238    }
239
240    /* (non-Javadoc)
241     * @see org.apache.oozie.command.XCommand#isLockRequired()
242     */
243    @Override
244    protected boolean isLockRequired() {
245        return true;
246    }
247
248    /*
249     * (non-Javadoc)
250     * @see org.apache.oozie.command.TransitionXCommand#getJob()
251     */
252    @Override
253    public Job getJob() {
254        return bundleJob;
255    }
256
257    /* (non-Javadoc)
258     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
259     */
260    @Override
261    public void notifyParent() throws CommandException {
262
263    }
264
265    /* (non-Javadoc)
266     * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
267     */
268    @Override
269    public XLog getLog() {
270        return LOG;
271    }
272
273    private final CoordinatorJobBean getCoordJob(String coordId) throws CommandException {
274        try {
275            CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId);
276            return job;
277        }
278        catch (JPAExecutorException je) {
279            throw new CommandException(je);
280        }
281
282    }
283
284}