001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.Date; 023 import java.util.HashMap; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Map.Entry; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.oozie.BundleActionBean; 030 import org.apache.oozie.BundleJobBean; 031 import org.apache.oozie.ErrorCode; 032 import org.apache.oozie.XException; 033 import org.apache.oozie.client.Job; 034 import org.apache.oozie.client.OozieClient; 035 import org.apache.oozie.client.rest.JsonBean; 036 import org.apache.oozie.command.CommandException; 037 import org.apache.oozie.command.PreconditionException; 038 import org.apache.oozie.command.StartTransitionXCommand; 039 import org.apache.oozie.command.coord.CoordSubmitXCommand; 040 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 041 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 042 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor; 043 import org.apache.oozie.executor.jpa.JPAExecutorException; 044 import org.apache.oozie.service.JPAService; 045 import org.apache.oozie.service.Services; 046 import org.apache.oozie.util.JobUtils; 047 import org.apache.oozie.util.LogUtils; 048 import org.apache.oozie.util.ParamChecker; 049 import org.apache.oozie.util.XConfiguration; 050 import org.apache.oozie.util.XmlUtils; 051 import org.jdom.Attribute; 052 import org.jdom.Element; 053 import org.jdom.JDOMException; 054 055 /** 056 * The command to start Bundle job 057 */ 058 public class BundleStartXCommand extends StartTransitionXCommand { 059 private final String jobId; 060 private BundleJobBean bundleJob; 061 private JPAService jpaService = null; 062 063 /** 064 * The constructor for class {@link BundleStartXCommand} 065 * 066 * @param jobId the bundle job id 067 */ 068 public BundleStartXCommand(String jobId) { 069 super("bundle_start", "bundle_start", 1); 070 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 071 } 072 073 /** 074 * The constructor for class {@link BundleStartXCommand} 075 * 076 * @param jobId the bundle job id 077 * @param dryrun true if dryrun is enable 078 */ 079 public BundleStartXCommand(String jobId, boolean dryrun) { 080 super("bundle_start", "bundle_start", 1, dryrun); 081 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 082 } 083 084 /* (non-Javadoc) 085 * @see org.apache.oozie.command.XCommand#getEntityKey() 086 */ 087 @Override 088 public String getEntityKey() { 089 return jobId; 090 } 091 092 /* (non-Javadoc) 093 * @see org.apache.oozie.command.XCommand#isLockRequired() 094 */ 095 @Override 096 protected boolean isLockRequired() { 097 return true; 098 } 099 100 /* (non-Javadoc) 101 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 102 */ 103 @Override 104 protected void verifyPrecondition() throws CommandException, PreconditionException { 105 eagerVerifyPrecondition(); 106 } 107 108 /* (non-Javadoc) 109 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 110 */ 111 @Override 112 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 113 if (bundleJob.getStatus() != Job.Status.PREP) { 114 String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus(); 115 LOG.info(msg); 116 throw new PreconditionException(ErrorCode.E1100, msg); 117 } 118 } 119 /* (non-Javadoc) 120 * @see org.apache.oozie.command.XCommand#loadState() 121 */ 122 @Override 123 public void loadState() throws CommandException { 124 eagerLoadState(); 125 } 126 127 /* (non-Javadoc) 128 * @see org.apache.oozie.command.XCommand#eagerLoadState() 129 */ 130 @Override 131 public void eagerLoadState() throws CommandException { 132 try { 133 jpaService = Services.get().get(JPAService.class); 134 135 if (jpaService != null) { 136 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 137 LogUtils.setLogInfo(bundleJob, logInfo); 138 super.setJob(bundleJob); 139 140 } 141 else { 142 throw new CommandException(ErrorCode.E0610); 143 } 144 } 145 catch (XException ex) { 146 throw new CommandException(ex); 147 } 148 } 149 150 /* (non-Javadoc) 151 * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren() 152 */ 153 @Override 154 public void StartChildren() throws CommandException { 155 LOG.debug("Started coord jobs for the bundle=[{0}]", jobId); 156 insertBundleActions(); 157 startCoordJobs(); 158 LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId); 159 } 160 161 /* (non-Javadoc) 162 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 163 */ 164 @Override 165 public void notifyParent() { 166 } 167 168 /* (non-Javadoc) 169 * @see org.apache.oozie.command.StartTransitionXCommand#performWrites() 170 */ 171 @Override 172 public void performWrites() throws CommandException { 173 try { 174 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 175 } 176 catch (JPAExecutorException e) { 177 throw new CommandException(e); 178 } 179 } 180 181 /** 182 * Insert bundle actions 183 * 184 * @throws CommandException thrown if failed to create bundle actions 185 */ 186 @SuppressWarnings("unchecked") 187 private void insertBundleActions() throws CommandException { 188 if (bundleJob != null) { 189 Map<String, Boolean> map = new HashMap<String, Boolean>(); 190 try { 191 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 192 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 193 for (Element elem : coordElems) { 194 Attribute name = elem.getAttribute("name"); 195 Attribute critical = elem.getAttribute("critical"); 196 if (name != null) { 197 if (map.containsKey(name.getValue())) { 198 throw new CommandException(ErrorCode.E1304, name); 199 } 200 boolean isCritical = false; 201 if (critical != null && Boolean.parseBoolean(critical.getValue())) { 202 isCritical = true; 203 } 204 map.put(name.getValue(), isCritical); 205 } 206 else { 207 throw new CommandException(ErrorCode.E1305); 208 } 209 } 210 } 211 catch (JDOMException jex) { 212 throw new CommandException(ErrorCode.E1301, jex); 213 } 214 215 // if there is no coordinator for this bundle, failed it. 216 if (map.isEmpty()) { 217 bundleJob.setStatus(Job.Status.FAILED); 218 bundleJob.resetPending(); 219 try { 220 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 221 } 222 catch (JPAExecutorException jex) { 223 throw new CommandException(jex); 224 } 225 226 LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId); 227 throw new CommandException(ErrorCode.E1318, jobId); 228 } 229 230 for (Entry<String, Boolean> coordName : map.entrySet()) { 231 BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue()); 232 insertList.add(action); 233 } 234 } 235 else { 236 throw new CommandException(ErrorCode.E0604, jobId); 237 } 238 } 239 240 private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) { 241 BundleActionBean action = new BundleActionBean(); 242 action.setBundleActionId(jobId + "_" + coordName); 243 action.setBundleId(jobId); 244 action.setCoordName(coordName); 245 action.setStatus(Job.Status.PREP); 246 action.setLastModifiedTime(new Date()); 247 if (isCritical) { 248 action.setCritical(); 249 } 250 else { 251 action.resetCritical(); 252 } 253 return action; 254 } 255 256 /** 257 * Start Coord Jobs 258 * 259 * @throws CommandException thrown if failed to start coord jobs 260 */ 261 @SuppressWarnings("unchecked") 262 private void startCoordJobs() throws CommandException { 263 if (bundleJob != null) { 264 try { 265 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 266 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 267 for (Element coordElem : coordElems) { 268 Attribute name = coordElem.getAttribute("name"); 269 Configuration coordConf = mergeConfig(coordElem); 270 coordConf.set(OozieClient.BUNDLE_ID, jobId); 271 272 queue(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue())); 273 274 } 275 updateBundleAction(); 276 } 277 catch (JDOMException jex) { 278 throw new CommandException(ErrorCode.E1301, jex); 279 } 280 catch (JPAExecutorException je) { 281 throw new CommandException(je); 282 } 283 } 284 else { 285 throw new CommandException(ErrorCode.E0604, jobId); 286 } 287 } 288 289 private void updateBundleAction() throws JPAExecutorException { 290 for(JsonBean bAction : insertList) { 291 BundleActionBean action = (BundleActionBean) bAction; 292 action.incrementAndGetPending(); 293 action.setLastModifiedTime(new Date()); 294 } 295 } 296 297 /** 298 * Merge Bundle job config and the configuration from the coord job to pass 299 * to Coord Engine 300 * 301 * @param coordElem the coordinator configuration 302 * @return Configuration merged configuration 303 * @throws CommandException thrown if failed to merge configuration 304 */ 305 private Configuration mergeConfig(Element coordElem) throws CommandException { 306 String jobConf = bundleJob.getConf(); 307 // Step 1: runConf = jobConf 308 Configuration runConf = null; 309 try { 310 runConf = new XConfiguration(new StringReader(jobConf)); 311 } 312 catch (IOException e1) { 313 LOG.warn("Configuration parse error in:" + jobConf); 314 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1); 315 } 316 // Step 2: Merge local properties into runConf 317 // extract 'property' tags under 'configuration' block in the coordElem 318 // convert Element to XConfiguration 319 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace()); 320 321 if (localConfigElement != null) { 322 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString(); 323 Configuration localConf; 324 try { 325 localConf = new XConfiguration(new StringReader(strConfig)); 326 } 327 catch (IOException e1) { 328 LOG.warn("Configuration parse error in:" + strConfig); 329 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1); 330 } 331 332 // copy configuration properties in the coordElem to the runConf 333 XConfiguration.copy(localConf, runConf); 334 } 335 336 // Step 3: Extract value of 'app-path' in coordElem, save it as a 337 // new property called 'oozie.coord.application.path', and normalize. 338 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue(); 339 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath); 340 // Normalize coordinator appPath here; 341 try { 342 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf); 343 } 344 catch (IOException e) { 345 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH)); 346 } 347 return runConf; 348 } 349 350 /* (non-Javadoc) 351 * @see org.apache.oozie.command.TransitionXCommand#getJob() 352 */ 353 @Override 354 public Job getJob() { 355 return bundleJob; 356 } 357 358 /* (non-Javadoc) 359 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 360 */ 361 @Override 362 public void updateJob() throws CommandException { 363 updateList.add(bundleJob); 364 } 365 }