001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.bundle; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.util.Date; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.oozie.BundleActionBean; 031import org.apache.oozie.BundleJobBean; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.XException; 034import org.apache.oozie.action.hadoop.OozieJobInfo; 035import org.apache.oozie.client.Job; 036import org.apache.oozie.client.OozieClient; 037import org.apache.oozie.client.rest.JsonBean; 038import org.apache.oozie.command.CommandException; 039import org.apache.oozie.command.PreconditionException; 040import org.apache.oozie.command.StartTransitionXCommand; 041import org.apache.oozie.executor.jpa.BatchQueryExecutor; 042import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 043import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 044import org.apache.oozie.executor.jpa.JPAExecutorException; 045import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 046import org.apache.oozie.util.ELUtils; 047import org.apache.oozie.util.JobUtils; 048import org.apache.oozie.util.LogUtils; 049import org.apache.oozie.util.ParamChecker; 050import org.apache.oozie.util.XConfiguration; 051import org.apache.oozie.util.XmlUtils; 052import org.jdom.Attribute; 053import org.jdom.Element; 054import org.jdom.JDOMException; 055 056/** 057 * The command to start Bundle job 058 */ 059public class BundleStartXCommand extends StartTransitionXCommand { 060 private final String jobId; 061 private BundleJobBean bundleJob; 062 063 /** 064 * The constructor for class {@link BundleStartXCommand} 065 * 066 * @param jobId the bundle job id 067 */ 068 public BundleStartXCommand(String jobId) { 069 super("bundle_start", "bundle_start", 1); 070 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 071 } 072 073 /** 074 * The constructor for class {@link BundleStartXCommand} 075 * 076 * @param jobId the bundle job id 077 * @param dryrun true if dryrun is enable 078 */ 079 public BundleStartXCommand(String jobId, boolean dryrun) { 080 super("bundle_start", "bundle_start", 1, dryrun); 081 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 082 } 083 084 /* (non-Javadoc) 085 * @see org.apache.oozie.command.XCommand#getEntityKey() 086 */ 087 @Override 088 public String getEntityKey() { 089 return jobId; 090 } 091 092 @Override 093 public String getKey() { 094 return getName() + "_" + jobId; 095 } 096 097 /* (non-Javadoc) 098 * @see org.apache.oozie.command.XCommand#isLockRequired() 099 */ 100 @Override 101 protected boolean isLockRequired() { 102 return true; 103 } 104 105 @Override 106 protected void verifyPrecondition() throws CommandException, PreconditionException { 107 if (bundleJob.getStatus() != Job.Status.PREP) { 108 String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus(); 109 LOG.info(msg); 110 throw new PreconditionException(ErrorCode.E1100, msg); 111 } 112 } 113 114 @Override 115 public void loadState() throws CommandException { 116 try { 117 this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId); 118 LogUtils.setLogInfo(bundleJob); 119 super.setJob(bundleJob); 120 } 121 catch (XException ex) { 122 throw new CommandException(ex); 123 } 124 } 125 126 /* (non-Javadoc) 127 * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren() 128 */ 129 @Override 130 public void StartChildren() throws CommandException { 131 LOG.debug("Started coord jobs for the bundle=[{0}]", jobId); 132 insertBundleActions(); 133 startCoordJobs(); 134 LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId); 135 } 136 137 /* (non-Javadoc) 138 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 139 */ 140 @Override 141 public void notifyParent() { 142 } 143 144 /* (non-Javadoc) 145 * @see org.apache.oozie.command.StartTransitionXCommand#performWrites() 146 */ 147 @Override 148 public void performWrites() throws CommandException { 149 try { 150 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 151 } 152 catch (JPAExecutorException e) { 153 throw new CommandException(e); 154 } 155 } 156 157 /** 158 * Insert bundle actions 159 * 160 * @throws CommandException thrown if failed to create bundle actions 161 */ 162 @SuppressWarnings("unchecked") 163 private void insertBundleActions() throws CommandException { 164 if (bundleJob != null) { 165 Map<String, Boolean> map = new HashMap<String, Boolean>(); 166 try { 167 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 168 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 169 for (Element elem : coordElems) { 170 Attribute name = elem.getAttribute("name"); 171 Attribute critical = elem.getAttribute("critical"); 172 if (name != null) { 173 if (map.containsKey(name.getValue())) { 174 throw new CommandException(ErrorCode.E1304, name); 175 } 176 Configuration coordConf = mergeConfig(elem); 177 // skip coord job if it is not enabled 178 if (!isEnabled(elem, coordConf)) { 179 continue; 180 } 181 boolean isCritical = false; 182 if (critical != null && Boolean.parseBoolean(critical.getValue())) { 183 isCritical = true; 184 } 185 map.put(name.getValue(), isCritical); 186 } 187 else { 188 throw new CommandException(ErrorCode.E1305); 189 } 190 } 191 } 192 catch (JDOMException jex) { 193 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex); 194 } 195 196 // if there is no coordinator for this bundle, failed it. 197 if (map.isEmpty()) { 198 bundleJob.setStatus(Job.Status.FAILED); 199 bundleJob.resetPending(); 200 try { 201 BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob); 202 } 203 catch (JPAExecutorException jex) { 204 throw new CommandException(jex); 205 } 206 207 LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId); 208 throw new CommandException(ErrorCode.E1318, jobId); 209 } 210 211 for (Entry<String, Boolean> coordName : map.entrySet()) { 212 BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue()); 213 insertList.add(action); 214 } 215 } 216 else { 217 throw new CommandException(ErrorCode.E0604, jobId); 218 } 219 } 220 221 private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) { 222 BundleActionBean action = new BundleActionBean(); 223 action.setBundleActionId(jobId + "_" + coordName); 224 action.setBundleId(jobId); 225 action.setCoordName(coordName); 226 action.setStatus(Job.Status.PREP); 227 action.setLastModifiedTime(new Date()); 228 if (isCritical) { 229 action.setCritical(); 230 } 231 else { 232 action.resetCritical(); 233 } 234 return action; 235 } 236 237 /** 238 * Start Coord Jobs 239 * 240 * @throws CommandException thrown if failed to start coord jobs 241 */ 242 @SuppressWarnings("unchecked") 243 private void startCoordJobs() throws CommandException { 244 if (bundleJob != null) { 245 try { 246 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 247 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 248 for (Element coordElem : coordElems) { 249 Attribute name = coordElem.getAttribute("name"); 250 251 Configuration coordConf = mergeConfig(coordElem); 252 coordConf.set(OozieClient.BUNDLE_ID, jobId); 253 if (OozieJobInfo.isJobInfoEnabled()) { 254 coordConf.set(OozieJobInfo.BUNDLE_NAME, bundleJob.getAppName()); 255 } 256 // skip coord job if it is not enabled 257 if (!isEnabled(coordElem, coordConf)) { 258 continue; 259 } 260 String coordName=name.getValue(); 261 try { 262 coordName = ELUtils.resolveAppName(coordName, coordConf); 263 } 264 catch (Exception e) { 265 throw new CommandException(ErrorCode.E1321, e.getMessage(), e); 266 267 } 268 queue(new BundleCoordSubmitXCommand(coordConf, bundleJob.getId(), name.getValue())); 269 270 } 271 updateBundleAction(); 272 } 273 catch (JDOMException jex) { 274 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex); 275 } 276 catch (JPAExecutorException je) { 277 throw new CommandException(je); 278 } 279 } 280 else { 281 throw new CommandException(ErrorCode.E0604, jobId); 282 } 283 } 284 285 private void updateBundleAction() throws JPAExecutorException { 286 for(JsonBean bAction : insertList) { 287 BundleActionBean action = (BundleActionBean) bAction; 288 action.incrementAndGetPending(); 289 action.setLastModifiedTime(new Date()); 290 } 291 } 292 293 /** 294 * Merge Bundle job config and the configuration from the coord job to pass 295 * to Coord Engine 296 * 297 * @param coordElem the coordinator configuration 298 * @return Configuration merged configuration 299 * @throws CommandException thrown if failed to merge configuration 300 */ 301 private Configuration mergeConfig(Element coordElem) throws CommandException { 302 String jobConf = bundleJob.getConf(); 303 // Step 1: runConf = jobConf 304 Configuration runConf = null; 305 try { 306 runConf = new XConfiguration(new StringReader(jobConf)); 307 } 308 catch (IOException e1) { 309 LOG.warn("Configuration parse error in:" + jobConf); 310 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1); 311 } 312 // Step 2: Merge local properties into runConf 313 // extract 'property' tags under 'configuration' block in the coordElem 314 // convert Element to XConfiguration 315 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace()); 316 317 if (localConfigElement != null) { 318 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString(); 319 Configuration localConf; 320 try { 321 localConf = new XConfiguration(new StringReader(strConfig)); 322 } 323 catch (IOException e1) { 324 LOG.warn("Configuration parse error in:" + strConfig); 325 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1); 326 } 327 328 // copy configuration properties in the coordElem to the runConf 329 XConfiguration.copy(localConf, runConf); 330 } 331 332 // Step 3: Extract value of 'app-path' in coordElem, save it as a 333 // new property called 'oozie.coord.application.path', and normalize. 334 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue(); 335 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath); 336 // Normalize coordinator appPath here; 337 try { 338 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf); 339 } 340 catch (IOException e) { 341 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH)); 342 } 343 return runConf; 344 } 345 346 /* (non-Javadoc) 347 * @see org.apache.oozie.command.TransitionXCommand#getJob() 348 */ 349 @Override 350 public Job getJob() { 351 return bundleJob; 352 } 353 354 /* (non-Javadoc) 355 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 356 */ 357 @Override 358 public void updateJob() throws CommandException { 359 updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob)); 360 } 361 362 /** 363 * Checks whether the coordinator is enabled 364 * 365 * @param coordElem 366 * @param coordConf 367 * @return true if coordinator is enabled, otherwise false. 368 * @throws CommandException 369 */ 370 private boolean isEnabled(Element coordElem, Configuration coordConf) throws CommandException { 371 Attribute enabled = coordElem.getAttribute("enabled"); 372 if (enabled == null) { 373 // default is true 374 return true; 375 } 376 String isEnabled = enabled.getValue(); 377 try { 378 isEnabled = ELUtils.resolveAppName(isEnabled, coordConf); 379 } 380 catch (Exception e) { 381 throw new CommandException(ErrorCode.E1321, e.getMessage(), e); 382 } 383 return Boolean.parseBoolean(isEnabled); 384 } 385 386}