001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.Date; 023 import java.util.HashMap; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Map.Entry; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.oozie.BundleActionBean; 030 import org.apache.oozie.BundleJobBean; 031 import org.apache.oozie.ErrorCode; 032 import org.apache.oozie.XException; 033 import org.apache.oozie.client.Job; 034 import org.apache.oozie.client.OozieClient; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.PreconditionException; 037 import org.apache.oozie.command.StartTransitionXCommand; 038 import org.apache.oozie.command.coord.CoordSubmitXCommand; 039 import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor; 040 import org.apache.oozie.executor.jpa.BundleActionInsertJPAExecutor; 041 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor; 042 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 043 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor; 044 import org.apache.oozie.executor.jpa.JPAExecutorException; 045 import org.apache.oozie.service.JPAService; 046 import org.apache.oozie.service.Services; 047 import org.apache.oozie.util.JobUtils; 048 import org.apache.oozie.util.LogUtils; 049 import org.apache.oozie.util.ParamChecker; 050 import org.apache.oozie.util.XConfiguration; 051 import org.apache.oozie.util.XmlUtils; 052 import org.jdom.Attribute; 053 import org.jdom.Element; 054 import org.jdom.JDOMException; 055 056 /** 057 * The command to start Bundle job 058 */ 059 public class BundleStartXCommand extends StartTransitionXCommand { 060 private final String jobId; 061 private BundleJobBean bundleJob; 062 private JPAService jpaService = null; 063 064 /** 065 * The constructor for class {@link BundleStartXCommand} 066 * 067 * @param jobId the bundle job id 068 */ 069 public BundleStartXCommand(String jobId) { 070 super("bundle_start", "bundle_start", 1); 071 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 072 } 073 074 /** 075 * The constructor for class {@link BundleStartXCommand} 076 * 077 * @param jobId the bundle job id 078 * @param dryrun true if dryrun is enable 079 */ 080 public BundleStartXCommand(String jobId, boolean dryrun) { 081 super("bundle_start", "bundle_start", 1, dryrun); 082 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 083 } 084 085 /* (non-Javadoc) 086 * @see org.apache.oozie.command.XCommand#getEntityKey() 087 */ 088 @Override 089 public String getEntityKey() { 090 return jobId; 091 } 092 093 /* (non-Javadoc) 094 * @see org.apache.oozie.command.XCommand#isLockRequired() 095 */ 096 @Override 097 protected boolean isLockRequired() { 098 return true; 099 } 100 101 /* (non-Javadoc) 102 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 103 */ 104 @Override 105 protected void verifyPrecondition() throws CommandException, PreconditionException { 106 eagerVerifyPrecondition(); 107 } 108 109 /* (non-Javadoc) 110 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 111 */ 112 @Override 113 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 114 if (bundleJob.getStatus() != Job.Status.PREP) { 115 String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus(); 116 LOG.info(msg); 117 throw new PreconditionException(ErrorCode.E1100, msg); 118 } 119 } 120 /* (non-Javadoc) 121 * @see org.apache.oozie.command.XCommand#loadState() 122 */ 123 @Override 124 public void loadState() throws CommandException { 125 eagerLoadState(); 126 } 127 128 /* (non-Javadoc) 129 * @see org.apache.oozie.command.XCommand#eagerLoadState() 130 */ 131 @Override 132 public void eagerLoadState() throws CommandException { 133 try { 134 jpaService = Services.get().get(JPAService.class); 135 136 if (jpaService != null) { 137 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 138 LogUtils.setLogInfo(bundleJob, logInfo); 139 super.setJob(bundleJob); 140 141 } 142 else { 143 throw new CommandException(ErrorCode.E0610); 144 } 145 } 146 catch (XException ex) { 147 throw new CommandException(ex); 148 } 149 } 150 151 /* (non-Javadoc) 152 * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren() 153 */ 154 @Override 155 public void StartChildren() throws CommandException { 156 LOG.debug("Started coord jobs for the bundle=[{0}]", jobId); 157 insertBundleActions(); 158 startCoordJobs(); 159 LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId); 160 } 161 162 /* (non-Javadoc) 163 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 164 */ 165 @Override 166 public void notifyParent() { 167 } 168 169 /** 170 * Insert bundle actions 171 * 172 * @throws CommandException thrown if failed to create bundle actions 173 */ 174 @SuppressWarnings("unchecked") 175 private void insertBundleActions() throws CommandException { 176 if (bundleJob != null) { 177 Map<String, Boolean> map = new HashMap<String, Boolean>(); 178 try { 179 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 180 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 181 for (Element elem : coordElems) { 182 Attribute name = elem.getAttribute("name"); 183 Attribute critical = elem.getAttribute("critical"); 184 if (name != null) { 185 if (map.containsKey(name.getValue())) { 186 throw new CommandException(ErrorCode.E1304, name); 187 } 188 boolean isCritical = false; 189 if (critical != null && Boolean.parseBoolean(critical.getValue())) { 190 isCritical = true; 191 } 192 map.put(name.getValue(), isCritical); 193 } 194 else { 195 throw new CommandException(ErrorCode.E1305); 196 } 197 } 198 } 199 catch (JDOMException jex) { 200 throw new CommandException(ErrorCode.E1301, jex); 201 } 202 203 try { 204 // if there is no coordinator for this bundle, failed it. 205 if (map.isEmpty()) { 206 bundleJob.setStatus(Job.Status.FAILED); 207 bundleJob.resetPending(); 208 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 209 LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId); 210 throw new CommandException(ErrorCode.E1318, jobId); 211 } 212 213 for (Entry<String, Boolean> coordName : map.entrySet()) { 214 BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue()); 215 216 jpaService.execute(new BundleActionInsertJPAExecutor(action)); 217 } 218 } 219 catch (JPAExecutorException je) { 220 throw new CommandException(je); 221 } 222 223 } 224 else { 225 throw new CommandException(ErrorCode.E0604, jobId); 226 } 227 } 228 229 private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) { 230 BundleActionBean action = new BundleActionBean(); 231 action.setBundleActionId(jobId + "_" + coordName); 232 action.setBundleId(jobId); 233 action.setCoordName(coordName); 234 action.setStatus(Job.Status.PREP); 235 action.setLastModifiedTime(new Date()); 236 if (isCritical) { 237 action.setCritical(); 238 } 239 else { 240 action.resetCritical(); 241 } 242 return action; 243 } 244 245 /** 246 * Start Coord Jobs 247 * 248 * @throws CommandException thrown if failed to start coord jobs 249 */ 250 @SuppressWarnings("unchecked") 251 private void startCoordJobs() throws CommandException { 252 if (bundleJob != null) { 253 try { 254 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 255 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 256 for (Element coordElem : coordElems) { 257 Attribute name = coordElem.getAttribute("name"); 258 Configuration coordConf = mergeConfig(coordElem); 259 coordConf.set(OozieClient.BUNDLE_ID, jobId); 260 261 queue(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue())); 262 263 updateBundleAction(name.getValue()); 264 } 265 } 266 catch (JDOMException jex) { 267 throw new CommandException(ErrorCode.E1301, jex); 268 } 269 catch (JPAExecutorException je) { 270 throw new CommandException(je); 271 } 272 } 273 else { 274 throw new CommandException(ErrorCode.E0604, jobId); 275 } 276 } 277 278 private void updateBundleAction(String coordName) throws JPAExecutorException { 279 BundleActionBean action = jpaService.execute(new BundleActionGetJPAExecutor(jobId, coordName)); 280 action.incrementAndGetPending(); 281 action.setLastModifiedTime(new Date()); 282 jpaService.execute(new BundleActionUpdateJPAExecutor(action)); 283 } 284 285 /** 286 * Merge Bundle job config and the configuration from the coord job to pass 287 * to Coord Engine 288 * 289 * @param coordElem the coordinator configuration 290 * @return Configuration merged configuration 291 * @throws CommandException thrown if failed to merge configuration 292 */ 293 private Configuration mergeConfig(Element coordElem) throws CommandException { 294 String jobConf = bundleJob.getConf(); 295 // Step 1: runConf = jobConf 296 Configuration runConf = null; 297 try { 298 runConf = new XConfiguration(new StringReader(jobConf)); 299 } 300 catch (IOException e1) { 301 LOG.warn("Configuration parse error in:" + jobConf); 302 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1); 303 } 304 // Step 2: Merge local properties into runConf 305 // extract 'property' tags under 'configuration' block in the coordElem 306 // convert Element to XConfiguration 307 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace()); 308 309 if (localConfigElement != null) { 310 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString(); 311 Configuration localConf; 312 try { 313 localConf = new XConfiguration(new StringReader(strConfig)); 314 } 315 catch (IOException e1) { 316 LOG.warn("Configuration parse error in:" + strConfig); 317 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1); 318 } 319 320 // copy configuration properties in the coordElem to the runConf 321 XConfiguration.copy(localConf, runConf); 322 } 323 324 // Step 3: Extract value of 'app-path' in coordElem, save it as a 325 // new property called 'oozie.coord.application.path', and normalize. 326 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue(); 327 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath); 328 // Normalize coordinator appPath here; 329 try { 330 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf); 331 } 332 catch (IOException e) { 333 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH)); 334 } 335 return runConf; 336 } 337 338 /* (non-Javadoc) 339 * @see org.apache.oozie.command.TransitionXCommand#getJob() 340 */ 341 @Override 342 public Job getJob() { 343 return bundleJob; 344 } 345 346 /* (non-Javadoc) 347 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 348 */ 349 @Override 350 public void updateJob() throws CommandException { 351 try { 352 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 353 } 354 catch (JPAExecutorException je) { 355 throw new CommandException(je); 356 } 357 } 358 }