This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.Date;
023 import java.util.HashMap;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Map.Entry;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.oozie.BundleActionBean;
030 import org.apache.oozie.BundleJobBean;
031 import org.apache.oozie.ErrorCode;
032 import org.apache.oozie.XException;
033 import org.apache.oozie.client.Job;
034 import org.apache.oozie.client.OozieClient;
035 import org.apache.oozie.command.CommandException;
036 import org.apache.oozie.command.PreconditionException;
037 import org.apache.oozie.command.StartTransitionXCommand;
038 import org.apache.oozie.command.coord.CoordSubmitXCommand;
039 import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
040 import org.apache.oozie.executor.jpa.BundleActionInsertJPAExecutor;
041 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
042 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
043 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
044 import org.apache.oozie.executor.jpa.JPAExecutorException;
045 import org.apache.oozie.service.JPAService;
046 import org.apache.oozie.service.Services;
047 import org.apache.oozie.util.JobUtils;
048 import org.apache.oozie.util.LogUtils;
049 import org.apache.oozie.util.ParamChecker;
050 import org.apache.oozie.util.XConfiguration;
051 import org.apache.oozie.util.XmlUtils;
052 import org.jdom.Attribute;
053 import org.jdom.Element;
054 import org.jdom.JDOMException;
055
056 /**
057 * The command to start Bundle job
058 */
059 public class BundleStartXCommand extends StartTransitionXCommand {
060 private final String jobId;
061 private BundleJobBean bundleJob;
062 private JPAService jpaService = null;
063
064 /**
065 * The constructor for class {@link BundleStartXCommand}
066 *
067 * @param jobId the bundle job id
068 */
069 public BundleStartXCommand(String jobId) {
070 super("bundle_start", "bundle_start", 1);
071 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
072 }
073
074 /**
075 * The constructor for class {@link BundleStartXCommand}
076 *
077 * @param jobId the bundle job id
078 * @param dryrun true if dryrun is enable
079 */
080 public BundleStartXCommand(String jobId, boolean dryrun) {
081 super("bundle_start", "bundle_start", 1, dryrun);
082 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
083 }
084
085 /* (non-Javadoc)
086 * @see org.apache.oozie.command.XCommand#getEntityKey()
087 */
088 @Override
089 protected String getEntityKey() {
090 return jobId;
091 }
092
093 /* (non-Javadoc)
094 * @see org.apache.oozie.command.XCommand#isLockRequired()
095 */
096 @Override
097 protected boolean isLockRequired() {
098 return true;
099 }
100
101 /* (non-Javadoc)
102 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
103 */
104 @Override
105 protected void verifyPrecondition() throws CommandException, PreconditionException {
106 eagerVerifyPrecondition();
107 }
108
109 /* (non-Javadoc)
110 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
111 */
112 @Override
113 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
114 if (bundleJob.getStatus() != Job.Status.PREP) {
115 String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus();
116 LOG.info(msg);
117 throw new PreconditionException(ErrorCode.E1100, msg);
118 }
119 }
120 /* (non-Javadoc)
121 * @see org.apache.oozie.command.XCommand#loadState()
122 */
123 @Override
124 public void loadState() throws CommandException {
125 eagerLoadState();
126 }
127
128 /* (non-Javadoc)
129 * @see org.apache.oozie.command.XCommand#eagerLoadState()
130 */
131 @Override
132 public void eagerLoadState() throws CommandException {
133 try {
134 jpaService = Services.get().get(JPAService.class);
135
136 if (jpaService != null) {
137 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
138 LogUtils.setLogInfo(bundleJob, logInfo);
139 super.setJob(bundleJob);
140
141 }
142 else {
143 throw new CommandException(ErrorCode.E0610);
144 }
145 }
146 catch (XException ex) {
147 throw new CommandException(ex);
148 }
149 }
150
151 /* (non-Javadoc)
152 * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren()
153 */
154 @Override
155 public void StartChildren() throws CommandException {
156 LOG.debug("Started coord jobs for the bundle=[{0}]", jobId);
157 insertBundleActions();
158 startCoordJobs();
159 LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId);
160 }
161
162 /* (non-Javadoc)
163 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
164 */
165 @Override
166 public void notifyParent() {
167 }
168
169 /**
170 * Insert bundle actions
171 *
172 * @throws CommandException thrown if failed to create bundle actions
173 */
174 @SuppressWarnings("unchecked")
175 private void insertBundleActions() throws CommandException {
176 if (bundleJob != null) {
177 Map<String, Boolean> map = new HashMap<String, Boolean>();
178 try {
179 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
180 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
181 for (Element elem : coordElems) {
182 Attribute name = elem.getAttribute("name");
183 Attribute critical = elem.getAttribute("critical");
184 if (name != null) {
185 if (map.containsKey(name.getValue())) {
186 throw new CommandException(ErrorCode.E1304, name);
187 }
188 boolean isCritical = false;
189 if (critical != null && Boolean.parseBoolean(critical.getValue())) {
190 isCritical = true;
191 }
192 map.put(name.getValue(), isCritical);
193 }
194 else {
195 throw new CommandException(ErrorCode.E1305);
196 }
197 }
198 }
199 catch (JDOMException jex) {
200 throw new CommandException(ErrorCode.E1301, jex);
201 }
202
203 try {
204 // if there is no coordinator for this bundle, failed it.
205 if (map.isEmpty()) {
206 bundleJob.setStatus(Job.Status.FAILED);
207 bundleJob.resetPending();
208 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
209 LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
210 throw new CommandException(ErrorCode.E1318, jobId);
211 }
212
213 for (Entry<String, Boolean> coordName : map.entrySet()) {
214 BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
215
216 jpaService.execute(new BundleActionInsertJPAExecutor(action));
217 }
218 }
219 catch (JPAExecutorException je) {
220 throw new CommandException(je);
221 }
222
223 }
224 else {
225 throw new CommandException(ErrorCode.E0604, jobId);
226 }
227 }
228
229 private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) {
230 BundleActionBean action = new BundleActionBean();
231 action.setBundleActionId(jobId + "_" + coordName);
232 action.setBundleId(jobId);
233 action.setCoordName(coordName);
234 action.setStatus(Job.Status.PREP);
235 action.setLastModifiedTime(new Date());
236 if (isCritical) {
237 action.setCritical();
238 }
239 else {
240 action.resetCritical();
241 }
242 return action;
243 }
244
245 /**
246 * Start Coord Jobs
247 *
248 * @throws CommandException thrown if failed to start coord jobs
249 */
250 @SuppressWarnings("unchecked")
251 private void startCoordJobs() throws CommandException {
252 if (bundleJob != null) {
253 try {
254 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
255 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
256 for (Element coordElem : coordElems) {
257 Attribute name = coordElem.getAttribute("name");
258 Configuration coordConf = mergeConfig(coordElem);
259 coordConf.set(OozieClient.BUNDLE_ID, jobId);
260
261 queue(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue()));
262
263 updateBundleAction(name.getValue());
264 }
265 }
266 catch (JDOMException jex) {
267 throw new CommandException(ErrorCode.E1301, jex);
268 }
269 catch (JPAExecutorException je) {
270 throw new CommandException(je);
271 }
272 }
273 else {
274 throw new CommandException(ErrorCode.E0604, jobId);
275 }
276 }
277
278 private void updateBundleAction(String coordName) throws JPAExecutorException {
279 BundleActionBean action = jpaService.execute(new BundleActionGetJPAExecutor(jobId, coordName));
280 action.incrementAndGetPending();
281 action.setLastModifiedTime(new Date());
282 jpaService.execute(new BundleActionUpdateJPAExecutor(action));
283 }
284
285 /**
286 * Merge Bundle job config and the configuration from the coord job to pass
287 * to Coord Engine
288 *
289 * @param coordElem the coordinator configuration
290 * @return Configuration merged configuration
291 * @throws CommandException thrown if failed to merge configuration
292 */
293 private Configuration mergeConfig(Element coordElem) throws CommandException {
294 String jobConf = bundleJob.getConf();
295 // Step 1: runConf = jobConf
296 Configuration runConf = null;
297 try {
298 runConf = new XConfiguration(new StringReader(jobConf));
299 }
300 catch (IOException e1) {
301 LOG.warn("Configuration parse error in:" + jobConf);
302 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
303 }
304 // Step 2: Merge local properties into runConf
305 // extract 'property' tags under 'configuration' block in the coordElem
306 // convert Element to XConfiguration
307 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
308
309 if (localConfigElement != null) {
310 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
311 Configuration localConf;
312 try {
313 localConf = new XConfiguration(new StringReader(strConfig));
314 }
315 catch (IOException e1) {
316 LOG.warn("Configuration parse error in:" + strConfig);
317 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
318 }
319
320 // copy configuration properties in the coordElem to the runConf
321 XConfiguration.copy(localConf, runConf);
322 }
323
324 // Step 3: Extract value of 'app-path' in coordElem, save it as a
325 // new property called 'oozie.coord.application.path', and normalize.
326 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
327 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
328 // Normalize coordinator appPath here;
329 try {
330 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
331 }
332 catch (IOException e) {
333 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
334 }
335 return runConf;
336 }
337
338 /* (non-Javadoc)
339 * @see org.apache.oozie.command.TransitionXCommand#getJob()
340 */
341 @Override
342 public Job getJob() {
343 return bundleJob;
344 }
345
346 /* (non-Javadoc)
347 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
348 */
349 @Override
350 public void updateJob() throws CommandException {
351 try {
352 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
353 }
354 catch (JPAExecutorException je) {
355 throw new CommandException(je);
356 }
357 }
358 }