This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.util.Date;
021 import java.util.HashMap;
022 import java.util.List;
023 import java.util.Map;
024
025 import org.apache.oozie.BundleActionBean;
026 import org.apache.oozie.BundleJobBean;
027 import org.apache.oozie.CoordinatorJobBean;
028 import org.apache.oozie.ErrorCode;
029 import org.apache.oozie.XException;
030 import org.apache.oozie.client.Job;
031 import org.apache.oozie.client.rest.RestConstants;
032 import org.apache.oozie.command.CommandException;
033 import org.apache.oozie.command.RerunTransitionXCommand;
034 import org.apache.oozie.command.coord.CoordRerunXCommand;
035 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
036 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
038 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
039 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
040 import org.apache.oozie.executor.jpa.JPAExecutorException;
041 import org.apache.oozie.service.JPAService;
042 import org.apache.oozie.service.Services;
043 import org.apache.oozie.util.DateUtils;
044 import org.apache.oozie.util.LogUtils;
045 import org.apache.oozie.util.ParamChecker;
046 import org.apache.oozie.util.XLog;
047
048 /**
049 * Rerun bundle coordinator jobs by a list of coordinator names or dates. User can specify if refresh or noCleanup.
050 * <p/>
051 * The "refresh" is used to indicate if user wants to refresh an action's input/outpur dataset urls
052 * <p/>
053 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
054 */
055 public class BundleRerunXCommand extends RerunTransitionXCommand<Void> {
056
057 private final String coordScope;
058 private final String dateScope;
059 private final boolean refresh;
060 private final boolean noCleanup;
061 private BundleJobBean bundleJob;
062 private List<BundleActionBean> bundleActions;
063 protected boolean prevPending;
064
065 private JPAService jpaService = null;
066
067 /**
068 * The constructor for class {@link BundleRerunXCommand}
069 *
070 * @param jobId the bundle job id
071 * @param coordScope the rerun scope for coordinator job names separated by ","
072 * @param dateScope the rerun scope for coordinator nominal times separated by ","
073 * @param refresh true if user wants to refresh input/outpur dataset urls
074 * @param noCleanup false if user wants to cleanup output events for given rerun actions
075 */
076 public BundleRerunXCommand(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
077 super("bundle_rerun", "bundle_rerun", 1);
078 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
079 this.coordScope = coordScope;
080 this.dateScope = dateScope;
081 this.refresh = refresh;
082 this.noCleanup = noCleanup;
083 }
084
085 /* (non-Javadoc)
086 * @see org.apache.oozie.command.XCommand#loadState()
087 */
088 @Override
089 protected void loadState() throws CommandException {
090 try {
091 jpaService = Services.get().get(JPAService.class);
092
093 if (jpaService != null) {
094 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
095 this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
096 LogUtils.setLogInfo(bundleJob, logInfo);
097 super.setJob(bundleJob);
098 prevPending = bundleJob.isPending();
099 }
100 else {
101 throw new CommandException(ErrorCode.E0610);
102 }
103 }
104 catch (XException ex) {
105 throw new CommandException(ex);
106 }
107
108 }
109
110 /* (non-Javadoc)
111 * @see org.apache.oozie.command.RerunTransitionXCommand#rerunChildren()
112 */
113 @Override
114 public void rerunChildren() throws CommandException {
115 boolean isUpdateActionDone = false;
116 Map<String, BundleActionBean> coordNameToBAMapping = new HashMap<String, BundleActionBean>();
117 if (bundleActions != null) {
118 for (BundleActionBean action : bundleActions) {
119 if (action.getCoordName() != null) {
120 coordNameToBAMapping.put(action.getCoordName(), action);
121 }
122 }
123 }
124
125 if (coordScope != null && !coordScope.isEmpty()) {
126 String[] list = coordScope.split(",");
127 for (String coordName : list) {
128 coordName = coordName.trim();
129 if (coordNameToBAMapping.keySet().contains(coordName)) {
130 String coordId = coordNameToBAMapping.get(coordName).getCoordId();
131 if (coordId == null) {
132 LOG.info("No coord id found. Therefore, nothing to queue for coord rerun for coordname: " + coordName);
133 continue;
134 }
135 CoordinatorJobBean coordJob = getCoordJob(coordId);
136
137 String rerunDateScope;
138 if (dateScope != null && !dateScope.isEmpty()) {
139 rerunDateScope = dateScope;
140 }
141 else {
142 String coordStart = DateUtils.convertDateToString(coordJob.getStartTime());
143 String coordEnd = DateUtils.convertDateToString(coordJob.getEndTime());
144 rerunDateScope = coordStart + "::" + coordEnd;
145 }
146 LOG.debug("Queuing rerun range [" + rerunDateScope + "] for coord id " + coordId + " of bundle "
147 + bundleJob.getId());
148 queue(new CoordRerunXCommand(coordId, RestConstants.JOB_COORD_RERUN_DATE, rerunDateScope, refresh,
149 noCleanup));
150 updateBundleAction(coordNameToBAMapping.get(coordName));
151 isUpdateActionDone = true;
152 }
153 else {
154 LOG.info("Rerun for coord " + coordName + " NOT performed because it is not in bundle ", bundleJob.getId());
155 }
156 }
157 }
158 else if (dateScope != null && !dateScope.isEmpty()) {
159 if (bundleActions != null) {
160 for (BundleActionBean action : bundleActions) {
161 if (action.getCoordId() == null) {
162 LOG.info("No coord id found. Therefore nothing to queue for coord rerun with coord name "
163 + action.getCoordName());
164 continue;
165 }
166 LOG.debug("Queuing rerun range [" + dateScope + "] for coord id " + action.getCoordId() + " of bundle "
167 + bundleJob.getId());
168 queue(new CoordRerunXCommand(action.getCoordId(), RestConstants.JOB_COORD_RERUN_DATE, dateScope,
169 refresh, noCleanup));
170 updateBundleAction(action);
171 isUpdateActionDone = true;
172 }
173 }
174 }
175 if (!isUpdateActionDone) {
176 transitToPrevious();
177 }
178 LOG.info("Rerun coord jobs for the bundle=[{0}]", jobId);
179 }
180
181 private final void transitToPrevious() throws CommandException {
182 bundleJob.setStatus(getPrevStatus());
183 if (!prevPending) {
184 bundleJob.resetPending();
185 }
186 else {
187 bundleJob.setPending();
188 }
189 updateJob();
190 }
191
192 /**
193 * Update bundle action
194 *
195 * @param action the bundle action
196 * @throws CommandException thrown if failed to update bundle action
197 */
198 private void updateBundleAction(BundleActionBean action) throws CommandException {
199 action.incrementAndGetPending();
200 action.setLastModifiedTime(new Date());
201 try {
202 jpaService.execute(new BundleActionUpdateJPAExecutor(action));
203 }
204 catch (JPAExecutorException je) {
205 throw new CommandException(je);
206 }
207 }
208
209 /* (non-Javadoc)
210 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
211 */
212 @Override
213 public void updateJob() throws CommandException {
214 try {
215 // rerun a paused bundle job will keep job status at paused and pending at previous pending
216 if (getPrevStatus()!= null && getPrevStatus().equals(Job.Status.PAUSED)) {
217 bundleJob.setStatus(Job.Status.PAUSED);
218 if (prevPending) {
219 bundleJob.setPending();
220 } else {
221 bundleJob.resetPending();
222 }
223 }
224 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
225 }
226 catch (JPAExecutorException je) {
227 throw new CommandException(je);
228 }
229
230 }
231
232 /* (non-Javadoc)
233 * @see org.apache.oozie.command.XCommand#getEntityKey()
234 */
235 @Override
236 protected String getEntityKey() {
237 return jobId;
238 }
239
240 /* (non-Javadoc)
241 * @see org.apache.oozie.command.XCommand#isLockRequired()
242 */
243 @Override
244 protected boolean isLockRequired() {
245 return true;
246 }
247
248 /*
249 * (non-Javadoc)
250 * @see org.apache.oozie.command.TransitionXCommand#getJob()
251 */
252 @Override
253 public Job getJob() {
254 return bundleJob;
255 }
256
257 /* (non-Javadoc)
258 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
259 */
260 @Override
261 public void notifyParent() throws CommandException {
262
263 }
264
265 /* (non-Javadoc)
266 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
267 */
268 @Override
269 public XLog getLog() {
270 return LOG;
271 }
272
273 private final CoordinatorJobBean getCoordJob(String coordId) throws CommandException {
274 try {
275 CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
276 return job;
277 }
278 catch (JPAExecutorException je) {
279 throw new CommandException(je);
280 }
281
282 }
283
284 }