This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.Date;
023 import java.util.HashMap;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Map.Entry;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.oozie.BundleActionBean;
030 import org.apache.oozie.BundleJobBean;
031 import org.apache.oozie.ErrorCode;
032 import org.apache.oozie.XException;
033 import org.apache.oozie.client.Job;
034 import org.apache.oozie.client.OozieClient;
035 import org.apache.oozie.client.rest.JsonBean;
036 import org.apache.oozie.command.CommandException;
037 import org.apache.oozie.command.PreconditionException;
038 import org.apache.oozie.command.StartTransitionXCommand;
039 import org.apache.oozie.command.coord.CoordSubmitXCommand;
040 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
041 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
042 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
043 import org.apache.oozie.executor.jpa.JPAExecutorException;
044 import org.apache.oozie.service.JPAService;
045 import org.apache.oozie.service.Services;
046 import org.apache.oozie.util.JobUtils;
047 import org.apache.oozie.util.LogUtils;
048 import org.apache.oozie.util.ParamChecker;
049 import org.apache.oozie.util.XConfiguration;
050 import org.apache.oozie.util.XmlUtils;
051 import org.jdom.Attribute;
052 import org.jdom.Element;
053 import org.jdom.JDOMException;
054
055 /**
056 * The command to start Bundle job
057 */
058 public class BundleStartXCommand extends StartTransitionXCommand {
059 private final String jobId;
060 private BundleJobBean bundleJob;
061 private JPAService jpaService = null;
062
063 /**
064 * The constructor for class {@link BundleStartXCommand}
065 *
066 * @param jobId the bundle job id
067 */
068 public BundleStartXCommand(String jobId) {
069 super("bundle_start", "bundle_start", 1);
070 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
071 }
072
073 /**
074 * The constructor for class {@link BundleStartXCommand}
075 *
076 * @param jobId the bundle job id
077 * @param dryrun true if dryrun is enable
078 */
079 public BundleStartXCommand(String jobId, boolean dryrun) {
080 super("bundle_start", "bundle_start", 1, dryrun);
081 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
082 }
083
084 /* (non-Javadoc)
085 * @see org.apache.oozie.command.XCommand#getEntityKey()
086 */
087 @Override
088 public String getEntityKey() {
089 return jobId;
090 }
091
092 @Override
093 public String getKey() {
094 return getName() + "_" + jobId;
095 }
096
097 /* (non-Javadoc)
098 * @see org.apache.oozie.command.XCommand#isLockRequired()
099 */
100 @Override
101 protected boolean isLockRequired() {
102 return true;
103 }
104
105 /* (non-Javadoc)
106 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
107 */
108 @Override
109 protected void verifyPrecondition() throws CommandException, PreconditionException {
110 eagerVerifyPrecondition();
111 }
112
113 /* (non-Javadoc)
114 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
115 */
116 @Override
117 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
118 if (bundleJob.getStatus() != Job.Status.PREP) {
119 String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus();
120 LOG.info(msg);
121 throw new PreconditionException(ErrorCode.E1100, msg);
122 }
123 }
124 /* (non-Javadoc)
125 * @see org.apache.oozie.command.XCommand#loadState()
126 */
127 @Override
128 public void loadState() throws CommandException {
129 eagerLoadState();
130 }
131
132 /* (non-Javadoc)
133 * @see org.apache.oozie.command.XCommand#eagerLoadState()
134 */
135 @Override
136 public void eagerLoadState() throws CommandException {
137 try {
138 jpaService = Services.get().get(JPAService.class);
139
140 if (jpaService != null) {
141 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
142 LogUtils.setLogInfo(bundleJob, logInfo);
143 super.setJob(bundleJob);
144
145 }
146 else {
147 throw new CommandException(ErrorCode.E0610);
148 }
149 }
150 catch (XException ex) {
151 throw new CommandException(ex);
152 }
153 }
154
155 /* (non-Javadoc)
156 * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren()
157 */
158 @Override
159 public void StartChildren() throws CommandException {
160 LOG.debug("Started coord jobs for the bundle=[{0}]", jobId);
161 insertBundleActions();
162 startCoordJobs();
163 LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId);
164 }
165
166 /* (non-Javadoc)
167 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
168 */
169 @Override
170 public void notifyParent() {
171 }
172
173 /* (non-Javadoc)
174 * @see org.apache.oozie.command.StartTransitionXCommand#performWrites()
175 */
176 @Override
177 public void performWrites() throws CommandException {
178 try {
179 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
180 }
181 catch (JPAExecutorException e) {
182 throw new CommandException(e);
183 }
184 }
185
186 /**
187 * Insert bundle actions
188 *
189 * @throws CommandException thrown if failed to create bundle actions
190 */
191 @SuppressWarnings("unchecked")
192 private void insertBundleActions() throws CommandException {
193 if (bundleJob != null) {
194 Map<String, Boolean> map = new HashMap<String, Boolean>();
195 try {
196 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
197 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
198 for (Element elem : coordElems) {
199 Attribute name = elem.getAttribute("name");
200 Attribute critical = elem.getAttribute("critical");
201 if (name != null) {
202 if (map.containsKey(name.getValue())) {
203 throw new CommandException(ErrorCode.E1304, name);
204 }
205 boolean isCritical = false;
206 if (critical != null && Boolean.parseBoolean(critical.getValue())) {
207 isCritical = true;
208 }
209 map.put(name.getValue(), isCritical);
210 }
211 else {
212 throw new CommandException(ErrorCode.E1305);
213 }
214 }
215 }
216 catch (JDOMException jex) {
217 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
218 }
219
220 // if there is no coordinator for this bundle, failed it.
221 if (map.isEmpty()) {
222 bundleJob.setStatus(Job.Status.FAILED);
223 bundleJob.resetPending();
224 try {
225 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
226 }
227 catch (JPAExecutorException jex) {
228 throw new CommandException(jex);
229 }
230
231 LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
232 throw new CommandException(ErrorCode.E1318, jobId);
233 }
234
235 for (Entry<String, Boolean> coordName : map.entrySet()) {
236 BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
237 insertList.add(action);
238 }
239 }
240 else {
241 throw new CommandException(ErrorCode.E0604, jobId);
242 }
243 }
244
245 private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) {
246 BundleActionBean action = new BundleActionBean();
247 action.setBundleActionId(jobId + "_" + coordName);
248 action.setBundleId(jobId);
249 action.setCoordName(coordName);
250 action.setStatus(Job.Status.PREP);
251 action.setLastModifiedTime(new Date());
252 if (isCritical) {
253 action.setCritical();
254 }
255 else {
256 action.resetCritical();
257 }
258 return action;
259 }
260
261 /**
262 * Start Coord Jobs
263 *
264 * @throws CommandException thrown if failed to start coord jobs
265 */
266 @SuppressWarnings("unchecked")
267 private void startCoordJobs() throws CommandException {
268 if (bundleJob != null) {
269 try {
270 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
271 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
272 for (Element coordElem : coordElems) {
273 Attribute name = coordElem.getAttribute("name");
274 Configuration coordConf = mergeConfig(coordElem);
275 coordConf.set(OozieClient.BUNDLE_ID, jobId);
276
277 queue(new CoordSubmitXCommand(coordConf, bundleJob.getId(), name.getValue()));
278
279 }
280 updateBundleAction();
281 }
282 catch (JDOMException jex) {
283 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
284 }
285 catch (JPAExecutorException je) {
286 throw new CommandException(je);
287 }
288 }
289 else {
290 throw new CommandException(ErrorCode.E0604, jobId);
291 }
292 }
293
294 private void updateBundleAction() throws JPAExecutorException {
295 for(JsonBean bAction : insertList) {
296 BundleActionBean action = (BundleActionBean) bAction;
297 action.incrementAndGetPending();
298 action.setLastModifiedTime(new Date());
299 }
300 }
301
302 /**
303 * Merge Bundle job config and the configuration from the coord job to pass
304 * to Coord Engine
305 *
306 * @param coordElem the coordinator configuration
307 * @return Configuration merged configuration
308 * @throws CommandException thrown if failed to merge configuration
309 */
310 private Configuration mergeConfig(Element coordElem) throws CommandException {
311 String jobConf = bundleJob.getConf();
312 // Step 1: runConf = jobConf
313 Configuration runConf = null;
314 try {
315 runConf = new XConfiguration(new StringReader(jobConf));
316 }
317 catch (IOException e1) {
318 LOG.warn("Configuration parse error in:" + jobConf);
319 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
320 }
321 // Step 2: Merge local properties into runConf
322 // extract 'property' tags under 'configuration' block in the coordElem
323 // convert Element to XConfiguration
324 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
325
326 if (localConfigElement != null) {
327 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
328 Configuration localConf;
329 try {
330 localConf = new XConfiguration(new StringReader(strConfig));
331 }
332 catch (IOException e1) {
333 LOG.warn("Configuration parse error in:" + strConfig);
334 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
335 }
336
337 // copy configuration properties in the coordElem to the runConf
338 XConfiguration.copy(localConf, runConf);
339 }
340
341 // Step 3: Extract value of 'app-path' in coordElem, save it as a
342 // new property called 'oozie.coord.application.path', and normalize.
343 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
344 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
345 // Normalize coordinator appPath here;
346 try {
347 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
348 }
349 catch (IOException e) {
350 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
351 }
352 return runConf;
353 }
354
355 /* (non-Javadoc)
356 * @see org.apache.oozie.command.TransitionXCommand#getJob()
357 */
358 @Override
359 public Job getJob() {
360 return bundleJob;
361 }
362
363 /* (non-Javadoc)
364 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
365 */
366 @Override
367 public void updateJob() throws CommandException {
368 updateList.add(bundleJob);
369 }
370 }