001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.Path; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.oozie.AppType; 025import org.apache.oozie.SLAEventBean; 026import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; 027import org.apache.oozie.WorkflowJobBean; 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; 030import org.apache.oozie.service.HadoopAccessorException; 031import org.apache.oozie.service.JPAService; 032import org.apache.oozie.service.UUIDService; 033import org.apache.oozie.service.WorkflowStoreService; 034import org.apache.oozie.service.WorkflowAppService; 035import org.apache.oozie.service.HadoopAccessorService; 036import org.apache.oozie.service.Services; 037import org.apache.oozie.service.DagXLogInfoService; 038import org.apache.oozie.util.ELUtils; 039import org.apache.oozie.util.LogUtils; 040import org.apache.oozie.sla.SLAOperations; 041import org.apache.oozie.util.XLog; 042import org.apache.oozie.util.ParamChecker; 043import org.apache.oozie.util.XConfiguration; 044import org.apache.oozie.util.XmlUtils; 045import org.apache.oozie.command.CommandException; 046import org.apache.oozie.executor.jpa.BatchQueryExecutor; 047import org.apache.oozie.executor.jpa.JPAExecutorException; 048import org.apache.oozie.service.ELService; 049import org.apache.oozie.store.StoreException; 050import org.apache.oozie.workflow.WorkflowApp; 051import org.apache.oozie.workflow.WorkflowException; 052import org.apache.oozie.workflow.WorkflowInstance; 053import org.apache.oozie.workflow.WorkflowLib; 054import org.apache.oozie.util.ELEvaluator; 055import org.apache.oozie.util.InstrumentUtils; 056import org.apache.oozie.util.PropertiesUtils; 057import org.apache.oozie.util.db.SLADbOperations; 058import org.apache.oozie.service.SchemaService.SchemaName; 059import org.apache.oozie.client.OozieClient; 060import org.apache.oozie.client.WorkflowJob; 061import org.apache.oozie.client.SLAEvent.SlaAppType; 062import org.apache.oozie.client.rest.JsonBean; 063import org.jdom.Element; 064import org.jdom.filter.ElementFilter; 065 066import java.util.ArrayList; 067import java.util.Date; 068import java.util.Iterator; 069import java.util.List; 070import java.util.Map; 071import java.util.Set; 072import java.util.HashSet; 073import java.io.IOException; 074import java.net.URI; 075 076@SuppressWarnings("deprecation") 077public class SubmitXCommand extends WorkflowXCommand<String> { 078 public static final String CONFIG_DEFAULT = "config-default.xml"; 079 080 private Configuration conf; 081 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 082 private String parentId; 083 084 /** 085 * Constructor to create the workflow Submit Command. 086 * 087 * @param conf : Configuration for workflow job 088 */ 089 public SubmitXCommand(Configuration conf) { 090 super("submit", "submit", 1); 091 this.conf = ParamChecker.notNull(conf, "conf"); 092 } 093 094 /** 095 * Constructor for submitting wf through coordinator 096 * 097 * @param conf : Configuration for workflow job 098 * @param parentId the coord action id 099 */ 100 public SubmitXCommand(Configuration conf, String parentId) { 101 this(conf); 102 this.parentId = parentId; 103 } 104 105 /** 106 * Constructor to create the workflow Submit Command. 107 * 108 * @param dryrun : if dryrun 109 * @param conf : Configuration for workflow job 110 */ 111 public SubmitXCommand(boolean dryrun, Configuration conf) { 112 this(conf); 113 this.dryrun = dryrun; 114 } 115 116 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 117 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 118 119 static { 120 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 121 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 122 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 123 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS}; 124 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 125 126 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER}; 127 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 128 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 129 } 130 131 @Override 132 protected String execute() throws CommandException { 133 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 134 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 135 try { 136 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 137 String user = conf.get(OozieClient.USER_NAME); 138 URI uri = new URI(conf.get(OozieClient.APP_PATH)); 139 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 140 Configuration fsConf = has.createConfiguration(uri.getAuthority()); 141 FileSystem fs = has.createFileSystem(user, uri, fsConf); 142 143 Path configDefault = null; 144 Configuration defaultConf = null; 145 // app path could be a directory 146 Path path = new Path(uri.getPath()); 147 if (!fs.isFile(path)) { 148 configDefault = new Path(path, CONFIG_DEFAULT); 149 } else { 150 configDefault = new Path(path.getParent(), CONFIG_DEFAULT); 151 } 152 153 if (fs.exists(configDefault)) { 154 try { 155 defaultConf = new XConfiguration(fs.open(configDefault)); 156 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 157 XConfiguration.injectDefaults(defaultConf, conf); 158 } 159 catch (IOException ex) { 160 throw new IOException("default configuration file, " + ex.getMessage(), ex); 161 } 162 } 163 if (defaultConf != null) { 164 defaultConf = resolveDefaultConfVariables(defaultConf); 165 } 166 167 WorkflowApp app = wps.parseDef(conf, defaultConf); 168 XConfiguration protoActionConf = wps.createProtoActionConf(conf, true); 169 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 170 171 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 172 173 // Resolving all variables in the job properties. 174 // This ensures the Hadoop Configuration semantics is preserved. 175 XConfiguration resolvedVarsConf = new XConfiguration(); 176 for (Map.Entry<String, String> entry : conf) { 177 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 178 } 179 conf = resolvedVarsConf; 180 181 WorkflowInstance wfInstance; 182 try { 183 wfInstance = workflowLib.createInstance(app, conf); 184 } 185 catch (WorkflowException e) { 186 throw new StoreException(e); 187 } 188 189 Configuration conf = wfInstance.getConf(); 190 // System.out.println("WF INSTANCE CONF:"); 191 // System.out.println(XmlUtils.prettyPrint(conf).toString()); 192 193 WorkflowJobBean workflow = new WorkflowJobBean(); 194 workflow.setId(wfInstance.getId()); 195 workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf)); 196 workflow.setAppPath(conf.get(OozieClient.APP_PATH)); 197 workflow.setConf(XmlUtils.prettyPrint(conf).toString()); 198 workflow.setProtoActionConf(protoActionConf.toXmlString()); 199 workflow.setCreatedTime(new Date()); 200 workflow.setLastModifiedTime(new Date()); 201 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 202 workflow.setStatus(WorkflowJob.Status.PREP); 203 workflow.setRun(0); 204 workflow.setUser(conf.get(OozieClient.USER_NAME)); 205 workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); 206 workflow.setWorkflowInstance(wfInstance); 207 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 208 // Set parent id if it doesn't already have one (for subworkflows) 209 if (workflow.getParentId() == null) { 210 workflow.setParentId(conf.get(SubWorkflowActionExecutor.PARENT_ID)); 211 } 212 // Set to coord action Id if workflow submitted through coordinator 213 if (workflow.getParentId() == null) { 214 workflow.setParentId(parentId); 215 } 216 217 LogUtils.setLogInfo(workflow); 218 LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus()); 219 Element wfElem = XmlUtils.parseXml(app.getDefinition()); 220 ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit"); 221 String jobSlaXml = verifySlaElements(wfElem, evalSla); 222 if (!dryrun) { 223 writeSLARegistration(wfElem, jobSlaXml, workflow.getId(), workflow.getParentId(), workflow.getUser(), 224 workflow.getGroup(), workflow.getAppName(), LOG, evalSla); 225 workflow.setSlaXml(jobSlaXml); 226 // System.out.println("SlaXml :"+ slaXml); 227 228 //store.insertWorkflow(workflow); 229 insertList.add(workflow); 230 JPAService jpaService = Services.get().get(JPAService.class); 231 if (jpaService != null) { 232 try { 233 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); 234 } 235 catch (JPAExecutorException je) { 236 throw new CommandException(je); 237 } 238 } 239 else { 240 LOG.error(ErrorCode.E0610); 241 return null; 242 } 243 244 return workflow.getId(); 245 } 246 else { 247 // Checking variable substitution for dryrun 248 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(workflow, null, false, false); 249 Element workflowXml = XmlUtils.parseXml(app.getDefinition()); 250 removeSlaElements(workflowXml); 251 String workflowXmlString = XmlUtils.removeComments(XmlUtils.prettyPrint(workflowXml).toString()); 252 workflowXmlString = context.getELEvaluator().evaluate(workflowXmlString, String.class); 253 workflowXml = XmlUtils.parseXml(workflowXmlString); 254 255 Iterator<Element> it = workflowXml.getDescendants(new ElementFilter("job-xml")); 256 257 // Checking all variable substitutions in job-xml files 258 while (it.hasNext()) { 259 Element e = it.next(); 260 String jobXml = e.getTextTrim(); 261 Path xmlPath = new Path(workflow.getAppPath(), jobXml); 262 Configuration jobXmlConf = new XConfiguration(fs.open(xmlPath)); 263 264 265 String jobXmlConfString = XmlUtils.prettyPrint(jobXmlConf).toString(); 266 jobXmlConfString = XmlUtils.removeComments(jobXmlConfString); 267 context.getELEvaluator().evaluate(jobXmlConfString, String.class); 268 } 269 270 return "OK"; 271 } 272 } 273 catch (WorkflowException ex) { 274 throw new CommandException(ex); 275 } 276 catch (HadoopAccessorException ex) { 277 throw new CommandException(ex); 278 } 279 catch (Exception ex) { 280 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 281 } 282 } 283 284 /** 285 * Resolving variables from config-default, which might be referencing into conf/defaultConf 286 * @param defaultConf config-default.xml 287 * @return resolved config-default configuration. 288 */ 289 private Configuration resolveDefaultConfVariables(Configuration defaultConf) { 290 XConfiguration resolveDefaultConf = new XConfiguration(); 291 for (Map.Entry<String, String> entry : defaultConf) { 292 String defaultConfKey = entry.getKey(); 293 String defaultConfValue = entry.getValue(); 294 // if value is referencing some other key, first check within the default config to resolve, 295 // then job.properties (conf) 296 if (defaultConfValue.contains("$") && defaultConf.get(defaultConfKey).contains("$")) { 297 resolveDefaultConf.set(defaultConfKey, conf.get(defaultConfKey)); 298 } else { 299 resolveDefaultConf.set(defaultConfKey, defaultConf.get(defaultConfKey)); 300 } 301 } 302 return resolveDefaultConf; 303 } 304 305 private void removeSlaElements(Element eWfJob) { 306 Element sla = XmlUtils.getSLAElement(eWfJob); 307 if (sla != null) { 308 eWfJob.removeChildren(sla.getName(), sla.getNamespace()); 309 } 310 311 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 312 sla = XmlUtils.getSLAElement(action); 313 if (sla != null) { 314 action.removeChildren(sla.getName(), sla.getNamespace()); 315 } 316 } 317 } 318 private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException { 319 String jobSlaXml = ""; 320 // Validate WF job 321 Element eSla = XmlUtils.getSLAElement(eWfJob); 322 if (eSla != null) { 323 jobSlaXml = resolveSla(eSla, evalSla); 324 } 325 326 // Validate all actions 327 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 328 eSla = XmlUtils.getSLAElement(action); 329 if (eSla != null) { 330 resolveSla(eSla, evalSla); 331 } 332 } 333 return jobSlaXml; 334 } 335 336 private void writeSLARegistration(Element eWfJob, String slaXml, String jobId, String parentId, String user, 337 String group, String appName, XLog log, ELEvaluator evalSla) throws CommandException { 338 try { 339 if (slaXml != null && slaXml.length() > 0) { 340 Element eSla = XmlUtils.parseXml(slaXml); 341 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, jobId, 342 SlaAppType.WORKFLOW_JOB, user, group, log); 343 if(slaEvent != null) { 344 insertList.add(slaEvent); 345 } 346 // insert into new table 347 SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName, 348 log, false); 349 } 350 // Add sla for wf actions 351 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 352 Element actionSla = XmlUtils.getSLAElement(action); 353 if (actionSla != null) { 354 String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla); 355 actionSla = XmlUtils.parseXml(actionSlaXml); 356 String actionId = Services.get().get(UUIDService.class) 357 .generateChildId(jobId, action.getAttributeValue("name") + ""); 358 SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION, 359 user, appName, log, false); 360 } 361 } 362 } 363 catch (Exception e) { 364 e.printStackTrace(); 365 throw new CommandException(ErrorCode.E1007, "workflow " + jobId, e.getMessage(), e); 366 } 367 } 368 369 /** 370 * Resolve variables in sla xml element. 371 * 372 * @param eSla sla xml element 373 * @param evalSla sla evaluator 374 * @return sla xml string after evaluation 375 * @throws CommandException 376 */ 377 public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException { 378 // EL evaluation 379 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 380 try { 381 slaXml = XmlUtils.removeComments(slaXml); 382 slaXml = evalSla.evaluate(slaXml, String.class); 383 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 384 return slaXml; 385 } 386 catch (Exception e) { 387 throw new CommandException(ErrorCode.E1004, "Validation error :" + e.getMessage(), e); 388 } 389 } 390 391 /** 392 * Create an EL evaluator for a given group. 393 * 394 * @param conf configuration variable 395 * @param group group variable 396 * @return the evaluator created for the group 397 */ 398 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 399 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 400 for (Map.Entry<String, String> entry : conf) { 401 eval.setVariable(entry.getKey(), entry.getValue()); 402 } 403 return eval; 404 } 405 406 @Override 407 public String getEntityKey() { 408 return null; 409 } 410 411 @Override 412 protected boolean isLockRequired() { 413 return false; 414 } 415 416 @Override 417 protected void loadState() { 418 419 } 420 421 @Override 422 protected void verifyPrecondition() throws CommandException { 423 424 } 425 426}