001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.Date; 023 import java.util.HashMap; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Map.Entry; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.oozie.BundleActionBean; 030 import org.apache.oozie.BundleJobBean; 031 import org.apache.oozie.ErrorCode; 032 import org.apache.oozie.XException; 033 import org.apache.oozie.client.Job; 034 import org.apache.oozie.client.OozieClient; 035 import org.apache.oozie.client.rest.JsonBean; 036 import org.apache.oozie.command.CommandException; 037 import org.apache.oozie.command.PreconditionException; 038 import org.apache.oozie.command.StartTransitionXCommand; 039 import org.apache.oozie.command.coord.CoordSubmitXCommand; 040 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 041 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 042 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor; 043 import org.apache.oozie.executor.jpa.JPAExecutorException; 044 import org.apache.oozie.service.JPAService; 045 import org.apache.oozie.service.Services; 046 import org.apache.oozie.util.JobUtils; 047 import org.apache.oozie.util.LogUtils; 048 import org.apache.oozie.util.ParamChecker; 049 import org.apache.oozie.util.XConfiguration; 050 import org.apache.oozie.util.XmlUtils; 051 import org.jdom.Attribute; 052 import org.jdom.Element; 053 import org.jdom.JDOMException; 054 055 /** 056 * The command to start Bundle job 057 */ 058 public class BundleStartXCommand extends StartTransitionXCommand { 059 private final String jobId; 060 private BundleJobBean bundleJob; 061 private JPAService jpaService = null; 062 063 /** 064 * The constructor for class {@link BundleStartXCommand} 065 * 066 * @param jobId the bundle job id 067 */ 068 public BundleStartXCommand(String jobId) { 069 super("bundle_start", "bundle_start", 1); 070 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 071 } 072 073 /** 074 * The constructor for class {@link BundleStartXCommand} 075 * 076 * @param jobId the bundle job id 077 * @param dryrun true if dryrun is enable 078 */ 079 public BundleStartXCommand(String jobId, boolean dryrun) { 080 super("bundle_start", "bundle_start", 1, dryrun); 081 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 082 } 083 084 /* (non-Javadoc) 085 * @see org.apache.oozie.command.XCommand#getEntityKey() 086 */ 087 @Override 088 public String getEntityKey() { 089 return jobId; 090 } 091 092 @Override 093 public String getKey() { 094 return getName() + "_" + jobId; 095 } 096 097 /* (non-Javadoc) 098 * @see org.apache.oozie.command.XCommand#isLockRequired() 099 */ 100 @Override 101 protected boolean isLockRequired() { 102 return true; 103 } 104 105 /* (non-Javadoc) 106 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 107 */ 108 @Override 109 protected void verifyPrecondition() throws CommandException, PreconditionException { 110 eagerVerifyPrecondition(); 111 } 112 113 /* (non-Javadoc) 114 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 115 */ 116 @Override 117 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 118 if (bundleJob.getStatus() != Job.Status.PREP) { 119 String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus(); 120 LOG.info(msg); 121 throw new PreconditionException(ErrorCode.E1100, msg); 122 } 123 } 124 /* (non-Javadoc) 125 * @see org.apache.oozie.command.XCommand#loadState() 126 */ 127 @Override 128 public void loadState() throws CommandException { 129 eagerLoadState(); 130 } 131 132 /* (non-Javadoc) 133 * @see org.apache.oozie.command.XCommand#eagerLoadState() 134 */ 135 @Override 136 public void eagerLoadState() throws CommandException { 137 try { 138 jpaService = Services.get().get(JPAService.class); 139 140 if (jpaService != null) { 141 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 142 LogUtils.setLogInfo(bundleJob, logInfo); 143 super.setJob(bundleJob); 144 145 } 146 else { 147 throw new CommandException(ErrorCode.E0610); 148 } 149 } 150 catch (XException ex) { 151 throw new CommandException(ex); 152 } 153 } 154 155 /* (non-Javadoc) 156 * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren() 157 */ 158 @Override 159 public void StartChildren() throws CommandException { 160 LOG.debug("Started coord jobs for the bundle=[{0}]", jobId); 161 insertBundleActions(); 162 startCoordJobs(); 163 LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId); 164 } 165 166 /* (non-Javadoc) 167 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 168 */ 169 @Override 170 public void notifyParent() { 171 } 172 173 /* (non-Javadoc) 174 * @see org.apache.oozie.command.StartTransitionXCommand#performWrites() 175 */ 176 @Override 177 public void performWrites() throws CommandException { 178 try { 179 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 180 } 181 catch (JPAExecutorException e) { 182 throw new CommandException(e); 183 } 184 } 185 186 /** 187 * Insert bundle actions 188 * 189 * @throws CommandException thrown if failed to create bundle actions 190 */ 191 @SuppressWarnings("unchecked") 192 private void insertBundleActions() throws CommandException { 193 if (bundleJob != null) { 194 Map<String, Boolean> map = new HashMap<String, Boolean>(); 195 try { 196 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 197 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 198 for (Element elem : coordElems) { 199 Attribute name = elem.getAttribute("name"); 200 Attribute critical = elem.getAttribute("critical"); 201 if (name != null) { 202 if (map.containsKey(name.getValue())) { 203 throw new CommandException(ErrorCode.E1304, name); 204 } 205 boolean isCritical = false; 206 if (critical != null && Boolean.parseBoolean(critical.getValue())) { 207 isCritical = true; 208 } 209 map.put(name.getValue(), isCritical); 210 } 211 else { 212 throw new CommandException(ErrorCode.E1305); 213 } 214 } 215 } 216 catch (JDOMException jex) { 217 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex); 218 } 219 220 // if there is no coordinator for this bundle, failed it. 221 if (map.isEmpty()) { 222 bundleJob.setStatus(Job.Status.FAILED); 223 bundleJob.resetPending(); 224 try { 225 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 226 } 227 catch (JPAExecutorException jex) { 228 throw new CommandException(jex); 229 } 230 231 LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId); 232 throw new CommandException(ErrorCode.E1318, jobId); 233 } 234 235 for (Entry<String, Boolean> coordName : map.entrySet()) { 236 BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue()); 237 insertList.add(action); 238 } 239 } 240 else { 241 throw new CommandException(ErrorCode.E0604, jobId); 242 } 243 } 244 245 private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) { 246 BundleActionBean action = new BundleActionBean(); 247 action.setBundleActionId(jobId + "_" + coordName); 248 action.setBundleId(jobId); 249 action.setCoordName(coordName); 250 action.setStatus(Job.Status.PREP); 251 action.setLastModifiedTime(new Date()); 252 if (isCritical) { 253 action.setCritical(); 254 } 255 else { 256 action.resetCritical(); 257 } 258 return action; 259 } 260 261 /** 262 * Start Coord Jobs 263 * 264 * @throws CommandException thrown if failed to start coord jobs 265 */ 266 @SuppressWarnings("unchecked") 267 private void startCoordJobs() throws CommandException { 268 if (bundleJob != null) { 269 try { 270 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 271 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 272 for (Element coordElem : coordElems) { 273 Attribute name = coordElem.getAttribute("name"); 274 Configuration coordConf = mergeConfig(coordElem); 275 coordConf.set(OozieClient.BUNDLE_ID, jobId); 276 277 queue(new CoordSubmitXCommand(coordConf, bundleJob.getId(), name.getValue())); 278 279 } 280 updateBundleAction(); 281 } 282 catch (JDOMException jex) { 283 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex); 284 } 285 catch (JPAExecutorException je) { 286 throw new CommandException(je); 287 } 288 } 289 else { 290 throw new CommandException(ErrorCode.E0604, jobId); 291 } 292 } 293 294 private void updateBundleAction() throws JPAExecutorException { 295 for(JsonBean bAction : insertList) { 296 BundleActionBean action = (BundleActionBean) bAction; 297 action.incrementAndGetPending(); 298 action.setLastModifiedTime(new Date()); 299 } 300 } 301 302 /** 303 * Merge Bundle job config and the configuration from the coord job to pass 304 * to Coord Engine 305 * 306 * @param coordElem the coordinator configuration 307 * @return Configuration merged configuration 308 * @throws CommandException thrown if failed to merge configuration 309 */ 310 private Configuration mergeConfig(Element coordElem) throws CommandException { 311 String jobConf = bundleJob.getConf(); 312 // Step 1: runConf = jobConf 313 Configuration runConf = null; 314 try { 315 runConf = new XConfiguration(new StringReader(jobConf)); 316 } 317 catch (IOException e1) { 318 LOG.warn("Configuration parse error in:" + jobConf); 319 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1); 320 } 321 // Step 2: Merge local properties into runConf 322 // extract 'property' tags under 'configuration' block in the coordElem 323 // convert Element to XConfiguration 324 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace()); 325 326 if (localConfigElement != null) { 327 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString(); 328 Configuration localConf; 329 try { 330 localConf = new XConfiguration(new StringReader(strConfig)); 331 } 332 catch (IOException e1) { 333 LOG.warn("Configuration parse error in:" + strConfig); 334 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1); 335 } 336 337 // copy configuration properties in the coordElem to the runConf 338 XConfiguration.copy(localConf, runConf); 339 } 340 341 // Step 3: Extract value of 'app-path' in coordElem, save it as a 342 // new property called 'oozie.coord.application.path', and normalize. 343 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue(); 344 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath); 345 // Normalize coordinator appPath here; 346 try { 347 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf); 348 } 349 catch (IOException e) { 350 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH)); 351 } 352 return runConf; 353 } 354 355 /* (non-Javadoc) 356 * @see org.apache.oozie.command.TransitionXCommand#getJob() 357 */ 358 @Override 359 public Job getJob() { 360 return bundleJob; 361 } 362 363 /* (non-Javadoc) 364 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 365 */ 366 @Override 367 public void updateJob() throws CommandException { 368 updateList.add(bundleJob); 369 } 370 }