001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.util.Date;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024
025import org.apache.oozie.BundleActionBean;
026import org.apache.oozie.BundleJobBean;
027import org.apache.oozie.CoordinatorJobBean;
028import org.apache.oozie.XException;
029import org.apache.oozie.client.Job;
030import org.apache.oozie.client.rest.RestConstants;
031import org.apache.oozie.command.CommandException;
032import org.apache.oozie.command.RerunTransitionXCommand;
033import org.apache.oozie.command.coord.CoordRerunXCommand;
034import org.apache.oozie.executor.jpa.BatchQueryExecutor;
035import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
036import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
037import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
038import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
040import org.apache.oozie.executor.jpa.JPAExecutorException;
041import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
042import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
043import org.apache.oozie.util.DateUtils;
044import org.apache.oozie.util.LogUtils;
045import org.apache.oozie.util.ParamChecker;
046import org.apache.oozie.util.XLog;
047
048/**
049 * Rerun bundle coordinator jobs by a list of coordinator names or dates. User can specify if refresh or noCleanup.
050 * <p/>
051 * The "refresh" is used to indicate if user wants to refresh an action's input/outpur dataset urls
052 * <p/>
053 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
054 */
055public class BundleRerunXCommand extends RerunTransitionXCommand<Void> {
056
057    private final String coordScope;
058    private final String dateScope;
059    private final boolean refresh;
060    private final boolean noCleanup;
061    private BundleJobBean bundleJob;
062    private List<BundleActionBean> bundleActions;
063    protected boolean prevPending;
064
065    /**
066     * The constructor for class {@link BundleRerunXCommand}
067     *
068     * @param jobId the bundle job id
069     * @param coordScope the rerun scope for coordinator job names separated by ","
070     * @param dateScope the rerun scope for coordinator nominal times separated by ","
071     * @param refresh true if user wants to refresh input/outpur dataset urls
072     * @param noCleanup false if user wants to cleanup output events for given rerun actions
073     */
074    public BundleRerunXCommand(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
075        super("bundle_rerun", "bundle_rerun", 1);
076        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
077        this.coordScope = coordScope;
078        this.dateScope = dateScope;
079        this.refresh = refresh;
080        this.noCleanup = noCleanup;
081    }
082
083    /* (non-Javadoc)
084     * @see org.apache.oozie.command.XCommand#loadState()
085     */
086    @Override
087    protected void loadState() throws CommandException {
088        try {
089            this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId);
090            this.bundleActions = BundleActionQueryExecutor.getInstance().getList(
091                    BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId);
092            LogUtils.setLogInfo(bundleJob, logInfo);
093            super.setJob(bundleJob);
094            prevPending = bundleJob.isPending();
095        }
096        catch (XException ex) {
097            throw new CommandException(ex);
098        }
099
100    }
101
102    /* (non-Javadoc)
103     * @see org.apache.oozie.command.RerunTransitionXCommand#rerunChildren()
104     */
105    @Override
106    public void rerunChildren() throws CommandException {
107        boolean isUpdateActionDone = false;
108        Map<String, BundleActionBean> coordNameToBAMapping = new HashMap<String, BundleActionBean>();
109        if (bundleActions != null) {
110            for (BundleActionBean action : bundleActions) {
111                if (action.getCoordName() != null) {
112                    coordNameToBAMapping.put(action.getCoordName(), action);
113                }
114            }
115        }
116
117        if (coordScope != null && !coordScope.isEmpty()) {
118            String[] list = coordScope.split(",");
119            for (String coordName : list) {
120                coordName = coordName.trim();
121                if (coordNameToBAMapping.keySet().contains(coordName)) {
122                    String coordId = coordNameToBAMapping.get(coordName).getCoordId();
123                    if (coordId == null) {
124                        LOG.info("No coord id found. Therefore, nothing to queue for coord rerun for coordname: " + coordName);
125                        continue;
126                    }
127                    CoordinatorJobBean coordJob = getCoordJob(coordId);
128
129                    String rerunDateScope;
130                    if (dateScope != null && !dateScope.isEmpty()) {
131                        rerunDateScope = dateScope;
132                    }
133                    else {
134                        String coordStart = DateUtils.formatDateOozieTZ(coordJob.getStartTime());
135                        String coordEnd = DateUtils.formatDateOozieTZ(coordJob.getEndTime());
136                        rerunDateScope = coordStart + "::" + coordEnd;
137                    }
138                    LOG.debug("Queuing rerun range [" + rerunDateScope + "] for coord id " + coordId + " of bundle "
139                            + bundleJob.getId());
140                    queue(new CoordRerunXCommand(coordId, RestConstants.JOB_COORD_SCOPE_DATE, rerunDateScope, refresh,
141                            noCleanup));
142                    updateBundleAction(coordNameToBAMapping.get(coordName));
143                    isUpdateActionDone = true;
144                }
145                else {
146                    LOG.info("Rerun for coord " + coordName + " NOT performed because it is not in bundle ", bundleJob.getId());
147                }
148            }
149        }
150        else if (dateScope != null && !dateScope.isEmpty()) {
151            if (bundleActions != null) {
152                for (BundleActionBean action : bundleActions) {
153                    if (action.getCoordId() == null) {
154                        LOG.info("No coord id found. Therefore nothing to queue for coord rerun with coord name "
155                                + action.getCoordName());
156                        continue;
157                    }
158                    LOG.debug("Queuing rerun range [" + dateScope + "] for coord id " + action.getCoordId() + " of bundle "
159                            + bundleJob.getId());
160                    queue(new CoordRerunXCommand(action.getCoordId(), RestConstants.JOB_COORD_SCOPE_DATE, dateScope,
161                            refresh, noCleanup));
162                    updateBundleAction(action);
163                    isUpdateActionDone = true;
164                }
165            }
166        }
167        if (!isUpdateActionDone) {
168            transitToPrevious();
169        }
170        LOG.info("Rerun coord jobs for the bundle=[{0}]", jobId);
171    }
172
173    private final void transitToPrevious() throws CommandException {
174        bundleJob.setStatus(getPrevStatus());
175        if (!prevPending) {
176            bundleJob.resetPending();
177        }
178        else {
179            bundleJob.setPending();
180        }
181        updateJob();
182    }
183
184    /**
185     * Update bundle action
186     *
187     * @param action the bundle action
188     * @throws CommandException thrown if failed to update bundle action
189     */
190    private void updateBundleAction(BundleActionBean action) {
191        action.incrementAndGetPending();
192        action.setLastModifiedTime(new Date());
193        updateList.add(new UpdateEntry<BundleActionQuery>(BundleActionQuery.UPDATE_BUNDLE_ACTION_PENDING_MODTIME, action));
194    }
195
196    /* (non-Javadoc)
197     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
198     */
199    @Override
200    public void updateJob() {
201        // rerun a paused bundle job will keep job status at paused and pending at previous pending
202        if (getPrevStatus() != null) {
203            Job.Status bundleJobStatus = getPrevStatus();
204            if (bundleJobStatus.equals(Job.Status.PAUSED) || bundleJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
205                bundleJob.setStatus(bundleJobStatus);
206                if (prevPending) {
207                    bundleJob.setPending();
208                }
209                else {
210                    bundleJob.resetPending();
211                }
212            }
213        }
214        updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob));
215    }
216
217    /* (non-Javadoc)
218     * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
219     */
220    @Override
221    public void performWrites() throws CommandException {
222        try {
223            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
224        }
225        catch (JPAExecutorException e) {
226            throw new CommandException(e);
227        }
228    }
229
230    /* (non-Javadoc)
231     * @see org.apache.oozie.command.XCommand#getEntityKey()
232     */
233    @Override
234    public String getEntityKey() {
235        return jobId;
236    }
237
238    /* (non-Javadoc)
239     * @see org.apache.oozie.command.XCommand#isLockRequired()
240     */
241    @Override
242    protected boolean isLockRequired() {
243        return true;
244    }
245
246    /*
247     * (non-Javadoc)
248     * @see org.apache.oozie.command.TransitionXCommand#getJob()
249     */
250    @Override
251    public Job getJob() {
252        return bundleJob;
253    }
254
255    /* (non-Javadoc)
256     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
257     */
258    @Override
259    public void notifyParent() throws CommandException {
260
261    }
262
263    /* (non-Javadoc)
264     * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
265     */
266    @Override
267    public XLog getLog() {
268        return LOG;
269    }
270
271    private final CoordinatorJobBean getCoordJob(String coordId) throws CommandException {
272        try {
273            CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId);
274            return job;
275        }
276        catch (JPAExecutorException je) {
277            throw new CommandException(je);
278        }
279
280    }
281
282}