This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.util.Date;
021 import java.util.HashMap;
022 import java.util.List;
023 import java.util.Map;
024
025 import org.apache.oozie.BundleActionBean;
026 import org.apache.oozie.BundleJobBean;
027 import org.apache.oozie.CoordinatorJobBean;
028 import org.apache.oozie.ErrorCode;
029 import org.apache.oozie.XException;
030 import org.apache.oozie.client.Job;
031 import org.apache.oozie.client.rest.RestConstants;
032 import org.apache.oozie.command.CommandException;
033 import org.apache.oozie.command.RerunTransitionXCommand;
034 import org.apache.oozie.command.coord.CoordRerunXCommand;
035 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
036 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
038 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039 import org.apache.oozie.executor.jpa.JPAExecutorException;
040 import org.apache.oozie.service.JPAService;
041 import org.apache.oozie.service.Services;
042 import org.apache.oozie.util.DateUtils;
043 import org.apache.oozie.util.LogUtils;
044 import org.apache.oozie.util.ParamChecker;
045 import org.apache.oozie.util.XLog;
046
047 /**
048 * Rerun bundle coordinator jobs by a list of coordinator names or dates. User can specify if refresh or noCleanup.
049 * <p/>
050 * The "refresh" is used to indicate if user wants to refresh an action's input/outpur dataset urls
051 * <p/>
052 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
053 */
054 public class BundleRerunXCommand extends RerunTransitionXCommand<Void> {
055
056 private final String coordScope;
057 private final String dateScope;
058 private final boolean refresh;
059 private final boolean noCleanup;
060 private BundleJobBean bundleJob;
061 private List<BundleActionBean> bundleActions;
062 protected boolean prevPending;
063
064 private JPAService jpaService = null;
065
066 /**
067 * The constructor for class {@link BundleRerunXCommand}
068 *
069 * @param jobId the bundle job id
070 * @param coordScope the rerun scope for coordinator job names separated by ","
071 * @param dateScope the rerun scope for coordinator nominal times separated by ","
072 * @param refresh true if user wants to refresh input/outpur dataset urls
073 * @param noCleanup false if user wants to cleanup output events for given rerun actions
074 */
075 public BundleRerunXCommand(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
076 super("bundle_rerun", "bundle_rerun", 1);
077 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
078 this.coordScope = coordScope;
079 this.dateScope = dateScope;
080 this.refresh = refresh;
081 this.noCleanup = noCleanup;
082 }
083
084 /* (non-Javadoc)
085 * @see org.apache.oozie.command.XCommand#loadState()
086 */
087 @Override
088 protected void loadState() throws CommandException {
089 try {
090 jpaService = Services.get().get(JPAService.class);
091
092 if (jpaService != null) {
093 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
094 this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
095 LogUtils.setLogInfo(bundleJob, logInfo);
096 super.setJob(bundleJob);
097 prevPending = bundleJob.isPending();
098 }
099 else {
100 throw new CommandException(ErrorCode.E0610);
101 }
102 }
103 catch (XException ex) {
104 throw new CommandException(ex);
105 }
106
107 }
108
109 /* (non-Javadoc)
110 * @see org.apache.oozie.command.RerunTransitionXCommand#rerunChildren()
111 */
112 @Override
113 public void rerunChildren() throws CommandException {
114 boolean isUpdateActionDone = false;
115 Map<String, BundleActionBean> coordNameToBAMapping = new HashMap<String, BundleActionBean>();
116 if (bundleActions != null) {
117 for (BundleActionBean action : bundleActions) {
118 if (action.getCoordName() != null) {
119 coordNameToBAMapping.put(action.getCoordName(), action);
120 }
121 }
122 }
123
124 if (coordScope != null && !coordScope.isEmpty()) {
125 String[] list = coordScope.split(",");
126 for (String coordName : list) {
127 coordName = coordName.trim();
128 if (coordNameToBAMapping.keySet().contains(coordName)) {
129 String coordId = coordNameToBAMapping.get(coordName).getCoordId();
130 if (coordId == null) {
131 LOG.info("No coord id found. Therefore, nothing to queue for coord rerun for coordname: " + coordName);
132 continue;
133 }
134 CoordinatorJobBean coordJob = getCoordJob(coordId);
135
136 String rerunDateScope;
137 if (dateScope != null && !dateScope.isEmpty()) {
138 rerunDateScope = dateScope;
139 }
140 else {
141 String coordStart = DateUtils.formatDateOozieTZ(coordJob.getStartTime());
142 String coordEnd = DateUtils.formatDateOozieTZ(coordJob.getEndTime());
143 rerunDateScope = coordStart + "::" + coordEnd;
144 }
145 LOG.debug("Queuing rerun range [" + rerunDateScope + "] for coord id " + coordId + " of bundle "
146 + bundleJob.getId());
147 queue(new CoordRerunXCommand(coordId, RestConstants.JOB_COORD_RERUN_DATE, rerunDateScope, refresh,
148 noCleanup));
149 updateBundleAction(coordNameToBAMapping.get(coordName));
150 isUpdateActionDone = true;
151 }
152 else {
153 LOG.info("Rerun for coord " + coordName + " NOT performed because it is not in bundle ", bundleJob.getId());
154 }
155 }
156 }
157 else if (dateScope != null && !dateScope.isEmpty()) {
158 if (bundleActions != null) {
159 for (BundleActionBean action : bundleActions) {
160 if (action.getCoordId() == null) {
161 LOG.info("No coord id found. Therefore nothing to queue for coord rerun with coord name "
162 + action.getCoordName());
163 continue;
164 }
165 LOG.debug("Queuing rerun range [" + dateScope + "] for coord id " + action.getCoordId() + " of bundle "
166 + bundleJob.getId());
167 queue(new CoordRerunXCommand(action.getCoordId(), RestConstants.JOB_COORD_RERUN_DATE, dateScope,
168 refresh, noCleanup));
169 updateBundleAction(action);
170 isUpdateActionDone = true;
171 }
172 }
173 }
174 if (!isUpdateActionDone) {
175 transitToPrevious();
176 }
177 LOG.info("Rerun coord jobs for the bundle=[{0}]", jobId);
178 }
179
180 private final void transitToPrevious() throws CommandException {
181 bundleJob.setStatus(getPrevStatus());
182 if (!prevPending) {
183 bundleJob.resetPending();
184 }
185 else {
186 bundleJob.setPending();
187 }
188 updateJob();
189 }
190
191 /**
192 * Update bundle action
193 *
194 * @param action the bundle action
195 * @throws CommandException thrown if failed to update bundle action
196 */
197 private void updateBundleAction(BundleActionBean action) {
198 action.incrementAndGetPending();
199 action.setLastModifiedTime(new Date());
200 updateList.add(action);
201 }
202
203 /* (non-Javadoc)
204 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
205 */
206 @Override
207 public void updateJob() {
208 // rerun a paused bundle job will keep job status at paused and pending at previous pending
209 if (getPrevStatus() != null) {
210 Job.Status bundleJobStatus = getPrevStatus();
211 if (bundleJobStatus.equals(Job.Status.PAUSED) || bundleJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
212 bundleJob.setStatus(bundleJobStatus);
213 if (prevPending) {
214 bundleJob.setPending();
215 }
216 else {
217 bundleJob.resetPending();
218 }
219 }
220 }
221 updateList.add(bundleJob);
222 }
223
224 /* (non-Javadoc)
225 * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
226 */
227 @Override
228 public void performWrites() throws CommandException {
229 try {
230 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
231 }
232 catch (JPAExecutorException e) {
233 throw new CommandException(e);
234 }
235 }
236
237 /* (non-Javadoc)
238 * @see org.apache.oozie.command.XCommand#getEntityKey()
239 */
240 @Override
241 public String getEntityKey() {
242 return jobId;
243 }
244
245 /* (non-Javadoc)
246 * @see org.apache.oozie.command.XCommand#isLockRequired()
247 */
248 @Override
249 protected boolean isLockRequired() {
250 return true;
251 }
252
253 /*
254 * (non-Javadoc)
255 * @see org.apache.oozie.command.TransitionXCommand#getJob()
256 */
257 @Override
258 public Job getJob() {
259 return bundleJob;
260 }
261
262 /* (non-Javadoc)
263 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
264 */
265 @Override
266 public void notifyParent() throws CommandException {
267
268 }
269
270 /* (non-Javadoc)
271 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
272 */
273 @Override
274 public XLog getLog() {
275 return LOG;
276 }
277
278 private final CoordinatorJobBean getCoordJob(String coordId) throws CommandException {
279 try {
280 CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
281 return job;
282 }
283 catch (JPAExecutorException je) {
284 throw new CommandException(je);
285 }
286
287 }
288
289 }