001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.bundle; 019 020import java.io.IOException; 021import java.io.StringReader; 022import java.util.Date; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.oozie.BundleActionBean; 030import org.apache.oozie.BundleJobBean; 031import org.apache.oozie.ErrorCode; 032import org.apache.oozie.XException; 033import org.apache.oozie.action.hadoop.OozieJobInfo; 034import org.apache.oozie.client.Job; 035import org.apache.oozie.client.OozieClient; 036import org.apache.oozie.client.rest.JsonBean; 037import org.apache.oozie.command.CommandException; 038import org.apache.oozie.command.PreconditionException; 039import org.apache.oozie.command.StartTransitionXCommand; 040import org.apache.oozie.command.coord.CoordSubmitXCommand; 041import org.apache.oozie.executor.jpa.BatchQueryExecutor; 042import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 043import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 044import org.apache.oozie.executor.jpa.JPAExecutorException; 045import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 046import org.apache.oozie.util.JobUtils; 047import org.apache.oozie.util.LogUtils; 048import org.apache.oozie.util.ParamChecker; 049import org.apache.oozie.util.XConfiguration; 050import org.apache.oozie.util.XmlUtils; 051import org.jdom.Attribute; 052import org.jdom.Element; 053import org.jdom.JDOMException; 054 055/** 056 * The command to start Bundle job 057 */ 058public class BundleStartXCommand extends StartTransitionXCommand { 059 private final String jobId; 060 private BundleJobBean bundleJob; 061 062 /** 063 * The constructor for class {@link BundleStartXCommand} 064 * 065 * @param jobId the bundle job id 066 */ 067 public BundleStartXCommand(String jobId) { 068 super("bundle_start", "bundle_start", 1); 069 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 070 } 071 072 /** 073 * The constructor for class {@link BundleStartXCommand} 074 * 075 * @param jobId the bundle job id 076 * @param dryrun true if dryrun is enable 077 */ 078 public BundleStartXCommand(String jobId, boolean dryrun) { 079 super("bundle_start", "bundle_start", 1, dryrun); 080 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 081 } 082 083 /* (non-Javadoc) 084 * @see org.apache.oozie.command.XCommand#getEntityKey() 085 */ 086 @Override 087 public String getEntityKey() { 088 return jobId; 089 } 090 091 @Override 092 public String getKey() { 093 return getName() + "_" + jobId; 094 } 095 096 /* (non-Javadoc) 097 * @see org.apache.oozie.command.XCommand#isLockRequired() 098 */ 099 @Override 100 protected boolean isLockRequired() { 101 return true; 102 } 103 104 @Override 105 protected void verifyPrecondition() throws CommandException, PreconditionException { 106 if (bundleJob.getStatus() != Job.Status.PREP) { 107 String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus(); 108 LOG.info(msg); 109 throw new PreconditionException(ErrorCode.E1100, msg); 110 } 111 } 112 113 @Override 114 public void loadState() throws CommandException { 115 try { 116 this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId); 117 LogUtils.setLogInfo(bundleJob, logInfo); 118 super.setJob(bundleJob); 119 } 120 catch (XException ex) { 121 throw new CommandException(ex); 122 } 123 } 124 125 /* (non-Javadoc) 126 * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren() 127 */ 128 @Override 129 public void StartChildren() throws CommandException { 130 LOG.debug("Started coord jobs for the bundle=[{0}]", jobId); 131 insertBundleActions(); 132 startCoordJobs(); 133 LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId); 134 } 135 136 /* (non-Javadoc) 137 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 138 */ 139 @Override 140 public void notifyParent() { 141 } 142 143 /* (non-Javadoc) 144 * @see org.apache.oozie.command.StartTransitionXCommand#performWrites() 145 */ 146 @Override 147 public void performWrites() throws CommandException { 148 try { 149 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 150 } 151 catch (JPAExecutorException e) { 152 throw new CommandException(e); 153 } 154 } 155 156 /** 157 * Insert bundle actions 158 * 159 * @throws CommandException thrown if failed to create bundle actions 160 */ 161 @SuppressWarnings("unchecked") 162 private void insertBundleActions() throws CommandException { 163 if (bundleJob != null) { 164 Map<String, Boolean> map = new HashMap<String, Boolean>(); 165 try { 166 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 167 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 168 for (Element elem : coordElems) { 169 Attribute name = elem.getAttribute("name"); 170 Attribute critical = elem.getAttribute("critical"); 171 if (name != null) { 172 if (map.containsKey(name.getValue())) { 173 throw new CommandException(ErrorCode.E1304, name); 174 } 175 boolean isCritical = false; 176 if (critical != null && Boolean.parseBoolean(critical.getValue())) { 177 isCritical = true; 178 } 179 map.put(name.getValue(), isCritical); 180 } 181 else { 182 throw new CommandException(ErrorCode.E1305); 183 } 184 } 185 } 186 catch (JDOMException jex) { 187 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex); 188 } 189 190 // if there is no coordinator for this bundle, failed it. 191 if (map.isEmpty()) { 192 bundleJob.setStatus(Job.Status.FAILED); 193 bundleJob.resetPending(); 194 try { 195 BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob); 196 } 197 catch (JPAExecutorException jex) { 198 throw new CommandException(jex); 199 } 200 201 LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId); 202 throw new CommandException(ErrorCode.E1318, jobId); 203 } 204 205 for (Entry<String, Boolean> coordName : map.entrySet()) { 206 BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue()); 207 insertList.add(action); 208 } 209 } 210 else { 211 throw new CommandException(ErrorCode.E0604, jobId); 212 } 213 } 214 215 private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) { 216 BundleActionBean action = new BundleActionBean(); 217 action.setBundleActionId(jobId + "_" + coordName); 218 action.setBundleId(jobId); 219 action.setCoordName(coordName); 220 action.setStatus(Job.Status.PREP); 221 action.setLastModifiedTime(new Date()); 222 if (isCritical) { 223 action.setCritical(); 224 } 225 else { 226 action.resetCritical(); 227 } 228 return action; 229 } 230 231 /** 232 * Start Coord Jobs 233 * 234 * @throws CommandException thrown if failed to start coord jobs 235 */ 236 @SuppressWarnings("unchecked") 237 private void startCoordJobs() throws CommandException { 238 if (bundleJob != null) { 239 try { 240 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 241 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 242 for (Element coordElem : coordElems) { 243 Attribute name = coordElem.getAttribute("name"); 244 Configuration coordConf = mergeConfig(coordElem); 245 coordConf.set(OozieClient.BUNDLE_ID, jobId); 246 if (OozieJobInfo.isJobInfoEnabled()) { 247 coordConf.set(OozieJobInfo.BUNDLE_NAME, bundleJob.getAppName()); 248 } 249 250 queue(new CoordSubmitXCommand(coordConf, bundleJob.getId(), name.getValue())); 251 252 } 253 updateBundleAction(); 254 } 255 catch (JDOMException jex) { 256 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex); 257 } 258 catch (JPAExecutorException je) { 259 throw new CommandException(je); 260 } 261 } 262 else { 263 throw new CommandException(ErrorCode.E0604, jobId); 264 } 265 } 266 267 private void updateBundleAction() throws JPAExecutorException { 268 for(JsonBean bAction : insertList) { 269 BundleActionBean action = (BundleActionBean) bAction; 270 action.incrementAndGetPending(); 271 action.setLastModifiedTime(new Date()); 272 } 273 } 274 275 /** 276 * Merge Bundle job config and the configuration from the coord job to pass 277 * to Coord Engine 278 * 279 * @param coordElem the coordinator configuration 280 * @return Configuration merged configuration 281 * @throws CommandException thrown if failed to merge configuration 282 */ 283 private Configuration mergeConfig(Element coordElem) throws CommandException { 284 String jobConf = bundleJob.getConf(); 285 // Step 1: runConf = jobConf 286 Configuration runConf = null; 287 try { 288 runConf = new XConfiguration(new StringReader(jobConf)); 289 } 290 catch (IOException e1) { 291 LOG.warn("Configuration parse error in:" + jobConf); 292 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1); 293 } 294 // Step 2: Merge local properties into runConf 295 // extract 'property' tags under 'configuration' block in the coordElem 296 // convert Element to XConfiguration 297 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace()); 298 299 if (localConfigElement != null) { 300 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString(); 301 Configuration localConf; 302 try { 303 localConf = new XConfiguration(new StringReader(strConfig)); 304 } 305 catch (IOException e1) { 306 LOG.warn("Configuration parse error in:" + strConfig); 307 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1); 308 } 309 310 // copy configuration properties in the coordElem to the runConf 311 XConfiguration.copy(localConf, runConf); 312 } 313 314 // Step 3: Extract value of 'app-path' in coordElem, save it as a 315 // new property called 'oozie.coord.application.path', and normalize. 316 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue(); 317 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath); 318 // Normalize coordinator appPath here; 319 try { 320 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf); 321 } 322 catch (IOException e) { 323 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH)); 324 } 325 return runConf; 326 } 327 328 /* (non-Javadoc) 329 * @see org.apache.oozie.command.TransitionXCommand#getJob() 330 */ 331 @Override 332 public Job getJob() { 333 return bundleJob; 334 } 335 336 /* (non-Javadoc) 337 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 338 */ 339 @Override 340 public void updateJob() throws CommandException { 341 updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob)); 342 } 343}