This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.Date;
023 import java.util.HashMap;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Map.Entry;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.oozie.BundleActionBean;
030 import org.apache.oozie.BundleJobBean;
031 import org.apache.oozie.ErrorCode;
032 import org.apache.oozie.XException;
033 import org.apache.oozie.client.Job;
034 import org.apache.oozie.client.OozieClient;
035 import org.apache.oozie.client.rest.JsonBean;
036 import org.apache.oozie.command.CommandException;
037 import org.apache.oozie.command.PreconditionException;
038 import org.apache.oozie.command.StartTransitionXCommand;
039 import org.apache.oozie.command.coord.CoordSubmitXCommand;
040 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
041 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
042 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
043 import org.apache.oozie.executor.jpa.JPAExecutorException;
044 import org.apache.oozie.service.JPAService;
045 import org.apache.oozie.service.Services;
046 import org.apache.oozie.util.JobUtils;
047 import org.apache.oozie.util.LogUtils;
048 import org.apache.oozie.util.ParamChecker;
049 import org.apache.oozie.util.XConfiguration;
050 import org.apache.oozie.util.XmlUtils;
051 import org.jdom.Attribute;
052 import org.jdom.Element;
053 import org.jdom.JDOMException;
054
055 /**
056 * The command to start Bundle job
057 */
058 public class BundleStartXCommand extends StartTransitionXCommand {
059 private final String jobId;
060 private BundleJobBean bundleJob;
061 private JPAService jpaService = null;
062
063 /**
064 * The constructor for class {@link BundleStartXCommand}
065 *
066 * @param jobId the bundle job id
067 */
068 public BundleStartXCommand(String jobId) {
069 super("bundle_start", "bundle_start", 1);
070 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
071 }
072
073 /**
074 * The constructor for class {@link BundleStartXCommand}
075 *
076 * @param jobId the bundle job id
077 * @param dryrun true if dryrun is enable
078 */
079 public BundleStartXCommand(String jobId, boolean dryrun) {
080 super("bundle_start", "bundle_start", 1, dryrun);
081 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
082 }
083
084 /* (non-Javadoc)
085 * @see org.apache.oozie.command.XCommand#getEntityKey()
086 */
087 @Override
088 public String getEntityKey() {
089 return jobId;
090 }
091
092 /* (non-Javadoc)
093 * @see org.apache.oozie.command.XCommand#isLockRequired()
094 */
095 @Override
096 protected boolean isLockRequired() {
097 return true;
098 }
099
100 /* (non-Javadoc)
101 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
102 */
103 @Override
104 protected void verifyPrecondition() throws CommandException, PreconditionException {
105 eagerVerifyPrecondition();
106 }
107
108 /* (non-Javadoc)
109 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
110 */
111 @Override
112 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
113 if (bundleJob.getStatus() != Job.Status.PREP) {
114 String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus();
115 LOG.info(msg);
116 throw new PreconditionException(ErrorCode.E1100, msg);
117 }
118 }
119 /* (non-Javadoc)
120 * @see org.apache.oozie.command.XCommand#loadState()
121 */
122 @Override
123 public void loadState() throws CommandException {
124 eagerLoadState();
125 }
126
127 /* (non-Javadoc)
128 * @see org.apache.oozie.command.XCommand#eagerLoadState()
129 */
130 @Override
131 public void eagerLoadState() throws CommandException {
132 try {
133 jpaService = Services.get().get(JPAService.class);
134
135 if (jpaService != null) {
136 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
137 LogUtils.setLogInfo(bundleJob, logInfo);
138 super.setJob(bundleJob);
139
140 }
141 else {
142 throw new CommandException(ErrorCode.E0610);
143 }
144 }
145 catch (XException ex) {
146 throw new CommandException(ex);
147 }
148 }
149
150 /* (non-Javadoc)
151 * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren()
152 */
153 @Override
154 public void StartChildren() throws CommandException {
155 LOG.debug("Started coord jobs for the bundle=[{0}]", jobId);
156 insertBundleActions();
157 startCoordJobs();
158 LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId);
159 }
160
161 /* (non-Javadoc)
162 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
163 */
164 @Override
165 public void notifyParent() {
166 }
167
168 /* (non-Javadoc)
169 * @see org.apache.oozie.command.StartTransitionXCommand#performWrites()
170 */
171 @Override
172 public void performWrites() throws CommandException {
173 try {
174 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
175 }
176 catch (JPAExecutorException e) {
177 throw new CommandException(e);
178 }
179 }
180
181 /**
182 * Insert bundle actions
183 *
184 * @throws CommandException thrown if failed to create bundle actions
185 */
186 @SuppressWarnings("unchecked")
187 private void insertBundleActions() throws CommandException {
188 if (bundleJob != null) {
189 Map<String, Boolean> map = new HashMap<String, Boolean>();
190 try {
191 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
192 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
193 for (Element elem : coordElems) {
194 Attribute name = elem.getAttribute("name");
195 Attribute critical = elem.getAttribute("critical");
196 if (name != null) {
197 if (map.containsKey(name.getValue())) {
198 throw new CommandException(ErrorCode.E1304, name);
199 }
200 boolean isCritical = false;
201 if (critical != null && Boolean.parseBoolean(critical.getValue())) {
202 isCritical = true;
203 }
204 map.put(name.getValue(), isCritical);
205 }
206 else {
207 throw new CommandException(ErrorCode.E1305);
208 }
209 }
210 }
211 catch (JDOMException jex) {
212 throw new CommandException(ErrorCode.E1301, jex);
213 }
214
215 // if there is no coordinator for this bundle, failed it.
216 if (map.isEmpty()) {
217 bundleJob.setStatus(Job.Status.FAILED);
218 bundleJob.resetPending();
219 try {
220 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
221 }
222 catch (JPAExecutorException jex) {
223 throw new CommandException(jex);
224 }
225
226 LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
227 throw new CommandException(ErrorCode.E1318, jobId);
228 }
229
230 for (Entry<String, Boolean> coordName : map.entrySet()) {
231 BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
232 insertList.add(action);
233 }
234 }
235 else {
236 throw new CommandException(ErrorCode.E0604, jobId);
237 }
238 }
239
240 private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) {
241 BundleActionBean action = new BundleActionBean();
242 action.setBundleActionId(jobId + "_" + coordName);
243 action.setBundleId(jobId);
244 action.setCoordName(coordName);
245 action.setStatus(Job.Status.PREP);
246 action.setLastModifiedTime(new Date());
247 if (isCritical) {
248 action.setCritical();
249 }
250 else {
251 action.resetCritical();
252 }
253 return action;
254 }
255
256 /**
257 * Start Coord Jobs
258 *
259 * @throws CommandException thrown if failed to start coord jobs
260 */
261 @SuppressWarnings("unchecked")
262 private void startCoordJobs() throws CommandException {
263 if (bundleJob != null) {
264 try {
265 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
266 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
267 for (Element coordElem : coordElems) {
268 Attribute name = coordElem.getAttribute("name");
269 Configuration coordConf = mergeConfig(coordElem);
270 coordConf.set(OozieClient.BUNDLE_ID, jobId);
271
272 queue(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue()));
273
274 }
275 updateBundleAction();
276 }
277 catch (JDOMException jex) {
278 throw new CommandException(ErrorCode.E1301, jex);
279 }
280 catch (JPAExecutorException je) {
281 throw new CommandException(je);
282 }
283 }
284 else {
285 throw new CommandException(ErrorCode.E0604, jobId);
286 }
287 }
288
289 private void updateBundleAction() throws JPAExecutorException {
290 for(JsonBean bAction : insertList) {
291 BundleActionBean action = (BundleActionBean) bAction;
292 action.incrementAndGetPending();
293 action.setLastModifiedTime(new Date());
294 }
295 }
296
297 /**
298 * Merge Bundle job config and the configuration from the coord job to pass
299 * to Coord Engine
300 *
301 * @param coordElem the coordinator configuration
302 * @return Configuration merged configuration
303 * @throws CommandException thrown if failed to merge configuration
304 */
305 private Configuration mergeConfig(Element coordElem) throws CommandException {
306 String jobConf = bundleJob.getConf();
307 // Step 1: runConf = jobConf
308 Configuration runConf = null;
309 try {
310 runConf = new XConfiguration(new StringReader(jobConf));
311 }
312 catch (IOException e1) {
313 LOG.warn("Configuration parse error in:" + jobConf);
314 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
315 }
316 // Step 2: Merge local properties into runConf
317 // extract 'property' tags under 'configuration' block in the coordElem
318 // convert Element to XConfiguration
319 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
320
321 if (localConfigElement != null) {
322 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
323 Configuration localConf;
324 try {
325 localConf = new XConfiguration(new StringReader(strConfig));
326 }
327 catch (IOException e1) {
328 LOG.warn("Configuration parse error in:" + strConfig);
329 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
330 }
331
332 // copy configuration properties in the coordElem to the runConf
333 XConfiguration.copy(localConf, runConf);
334 }
335
336 // Step 3: Extract value of 'app-path' in coordElem, save it as a
337 // new property called 'oozie.coord.application.path', and normalize.
338 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
339 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
340 // Normalize coordinator appPath here;
341 try {
342 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
343 }
344 catch (IOException e) {
345 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
346 }
347 return runConf;
348 }
349
350 /* (non-Javadoc)
351 * @see org.apache.oozie.command.TransitionXCommand#getJob()
352 */
353 @Override
354 public Job getJob() {
355 return bundleJob;
356 }
357
358 /* (non-Javadoc)
359 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
360 */
361 @Override
362 public void updateJob() throws CommandException {
363 updateList.add(bundleJob);
364 }
365 }