001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.wf; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.fs.Path; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.oozie.AppType; 024import org.apache.oozie.SLAEventBean; 025import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; 026import org.apache.oozie.WorkflowJobBean; 027import org.apache.oozie.ErrorCode; 028import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; 029import org.apache.oozie.service.HadoopAccessorException; 030import org.apache.oozie.service.JPAService; 031import org.apache.oozie.service.UUIDService; 032import org.apache.oozie.service.WorkflowStoreService; 033import org.apache.oozie.service.WorkflowAppService; 034import org.apache.oozie.service.HadoopAccessorService; 035import org.apache.oozie.service.Services; 036import org.apache.oozie.service.DagXLogInfoService; 037import org.apache.oozie.util.ELUtils; 038import org.apache.oozie.util.LogUtils; 039import org.apache.oozie.sla.SLAOperations; 040import org.apache.oozie.util.XLog; 041import org.apache.oozie.util.ParamChecker; 042import org.apache.oozie.util.XConfiguration; 043import org.apache.oozie.util.XmlUtils; 044import org.apache.oozie.command.CommandException; 045import org.apache.oozie.executor.jpa.BatchQueryExecutor; 046import org.apache.oozie.executor.jpa.JPAExecutorException; 047import org.apache.oozie.service.ELService; 048import org.apache.oozie.store.StoreException; 049import org.apache.oozie.workflow.WorkflowApp; 050import org.apache.oozie.workflow.WorkflowException; 051import org.apache.oozie.workflow.WorkflowInstance; 052import org.apache.oozie.workflow.WorkflowLib; 053import org.apache.oozie.util.ELEvaluator; 054import org.apache.oozie.util.InstrumentUtils; 055import org.apache.oozie.util.PropertiesUtils; 056import org.apache.oozie.util.db.SLADbOperations; 057import org.apache.oozie.service.SchemaService.SchemaName; 058import org.apache.oozie.client.OozieClient; 059import org.apache.oozie.client.WorkflowJob; 060import org.apache.oozie.client.SLAEvent.SlaAppType; 061import org.apache.oozie.client.rest.JsonBean; 062import org.jdom.Element; 063import org.jdom.filter.ElementFilter; 064 065import java.util.ArrayList; 066import java.util.Date; 067import java.util.Iterator; 068import java.util.List; 069import java.util.Map; 070import java.util.Set; 071import java.util.HashSet; 072import java.io.IOException; 073import java.net.URI; 074 075@SuppressWarnings("deprecation") 076public class SubmitXCommand extends WorkflowXCommand<String> { 077 public static final String CONFIG_DEFAULT = "config-default.xml"; 078 079 private Configuration conf; 080 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 081 private String parentId; 082 083 /** 084 * Constructor to create the workflow Submit Command. 085 * 086 * @param conf : Configuration for workflow job 087 */ 088 public SubmitXCommand(Configuration conf) { 089 super("submit", "submit", 1); 090 this.conf = ParamChecker.notNull(conf, "conf"); 091 } 092 093 /** 094 * Constructor for submitting wf through coordinator 095 * 096 * @param conf : Configuration for workflow job 097 * @param parentId: the coord action id 098 */ 099 public SubmitXCommand(Configuration conf, String parentId) { 100 this(conf); 101 this.parentId = parentId; 102 } 103 104 /** 105 * Constructor to create the workflow Submit Command. 106 * 107 * @param dryrun : if dryrun 108 * @param conf : Configuration for workflow job 109 * @param authToken : To be used for authentication 110 */ 111 public SubmitXCommand(boolean dryrun, Configuration conf) { 112 this(conf); 113 this.dryrun = dryrun; 114 } 115 116 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 117 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 118 119 static { 120 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 121 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 122 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 123 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS}; 124 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 125 126 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER}; 127 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 128 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 129 } 130 131 @Override 132 protected String execute() throws CommandException { 133 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 134 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 135 try { 136 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 137 String user = conf.get(OozieClient.USER_NAME); 138 URI uri = new URI(conf.get(OozieClient.APP_PATH)); 139 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 140 Configuration fsConf = has.createJobConf(uri.getAuthority()); 141 FileSystem fs = has.createFileSystem(user, uri, fsConf); 142 143 Path configDefault = null; 144 Configuration defaultConf = null; 145 // app path could be a directory 146 Path path = new Path(uri.getPath()); 147 if (!fs.isFile(path)) { 148 configDefault = new Path(path, CONFIG_DEFAULT); 149 } else { 150 configDefault = new Path(path.getParent(), CONFIG_DEFAULT); 151 } 152 153 if (fs.exists(configDefault)) { 154 try { 155 defaultConf = new XConfiguration(fs.open(configDefault)); 156 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 157 XConfiguration.injectDefaults(defaultConf, conf); 158 } 159 catch (IOException ex) { 160 throw new IOException("default configuration file, " + ex.getMessage(), ex); 161 } 162 } 163 164 WorkflowApp app = wps.parseDef(conf, defaultConf); 165 XConfiguration protoActionConf = wps.createProtoActionConf(conf, true); 166 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 167 168 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 169 170 // Resolving all variables in the job properties. 171 // This ensures the Hadoop Configuration semantics is preserved. 172 XConfiguration resolvedVarsConf = new XConfiguration(); 173 for (Map.Entry<String, String> entry : conf) { 174 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 175 } 176 conf = resolvedVarsConf; 177 178 WorkflowInstance wfInstance; 179 try { 180 wfInstance = workflowLib.createInstance(app, conf); 181 } 182 catch (WorkflowException e) { 183 throw new StoreException(e); 184 } 185 186 Configuration conf = wfInstance.getConf(); 187 // System.out.println("WF INSTANCE CONF:"); 188 // System.out.println(XmlUtils.prettyPrint(conf).toString()); 189 190 WorkflowJobBean workflow = new WorkflowJobBean(); 191 workflow.setId(wfInstance.getId()); 192 workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf)); 193 workflow.setAppPath(conf.get(OozieClient.APP_PATH)); 194 workflow.setConf(XmlUtils.prettyPrint(conf).toString()); 195 workflow.setProtoActionConf(protoActionConf.toXmlString()); 196 workflow.setCreatedTime(new Date()); 197 workflow.setLastModifiedTime(new Date()); 198 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 199 workflow.setStatus(WorkflowJob.Status.PREP); 200 workflow.setRun(0); 201 workflow.setUser(conf.get(OozieClient.USER_NAME)); 202 workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); 203 workflow.setWorkflowInstance(wfInstance); 204 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 205 // Set parent id if it doesn't already have one (for subworkflows) 206 if (workflow.getParentId() == null) { 207 workflow.setParentId(conf.get(SubWorkflowActionExecutor.PARENT_ID)); 208 } 209 // Set to coord action Id if workflow submitted through coordinator 210 if (workflow.getParentId() == null) { 211 workflow.setParentId(parentId); 212 } 213 214 LogUtils.setLogInfo(workflow, logInfo); 215 LOG = XLog.resetPrefix(LOG); 216 LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus()); 217 Element wfElem = XmlUtils.parseXml(app.getDefinition()); 218 ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit"); 219 String jobSlaXml = verifySlaElements(wfElem, evalSla); 220 if (!dryrun) { 221 writeSLARegistration(wfElem, jobSlaXml, workflow.getId(), workflow.getParentId(), workflow.getUser(), 222 workflow.getGroup(), workflow.getAppName(), LOG, evalSla); 223 workflow.setSlaXml(jobSlaXml); 224 // System.out.println("SlaXml :"+ slaXml); 225 226 //store.insertWorkflow(workflow); 227 insertList.add(workflow); 228 JPAService jpaService = Services.get().get(JPAService.class); 229 if (jpaService != null) { 230 try { 231 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); 232 } 233 catch (JPAExecutorException je) { 234 throw new CommandException(je); 235 } 236 } 237 else { 238 LOG.error(ErrorCode.E0610); 239 return null; 240 } 241 242 return workflow.getId(); 243 } 244 else { 245 // Checking variable substitution for dryrun 246 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(workflow, null, false, false); 247 Element workflowXml = XmlUtils.parseXml(app.getDefinition()); 248 removeSlaElements(workflowXml); 249 String workflowXmlString = XmlUtils.removeComments(XmlUtils.prettyPrint(workflowXml).toString()); 250 workflowXmlString = context.getELEvaluator().evaluate(workflowXmlString, String.class); 251 workflowXml = XmlUtils.parseXml(workflowXmlString); 252 253 Iterator<Element> it = workflowXml.getDescendants(new ElementFilter("job-xml")); 254 255 // Checking all variable substitutions in job-xml files 256 while (it.hasNext()) { 257 Element e = it.next(); 258 String jobXml = e.getTextTrim(); 259 Path xmlPath = new Path(workflow.getAppPath(), jobXml); 260 Configuration jobXmlConf = new XConfiguration(fs.open(xmlPath)); 261 262 263 String jobXmlConfString = XmlUtils.prettyPrint(jobXmlConf).toString(); 264 jobXmlConfString = XmlUtils.removeComments(jobXmlConfString); 265 context.getELEvaluator().evaluate(jobXmlConfString, String.class); 266 } 267 268 return "OK"; 269 } 270 } 271 catch (WorkflowException ex) { 272 throw new CommandException(ex); 273 } 274 catch (HadoopAccessorException ex) { 275 throw new CommandException(ex); 276 } 277 catch (Exception ex) { 278 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 279 } 280 } 281 282 private void removeSlaElements(Element eWfJob) { 283 Element sla = XmlUtils.getSLAElement(eWfJob); 284 if (sla != null) { 285 eWfJob.removeChildren(sla.getName(), sla.getNamespace()); 286 } 287 288 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 289 sla = XmlUtils.getSLAElement(action); 290 if (sla != null) { 291 action.removeChildren(sla.getName(), sla.getNamespace()); 292 } 293 } 294 } 295 private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException { 296 String jobSlaXml = ""; 297 // Validate WF job 298 Element eSla = XmlUtils.getSLAElement(eWfJob); 299 if (eSla != null) { 300 jobSlaXml = resolveSla(eSla, evalSla); 301 } 302 303 // Validate all actions 304 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 305 eSla = XmlUtils.getSLAElement(action); 306 if (eSla != null) { 307 resolveSla(eSla, evalSla); 308 } 309 } 310 return jobSlaXml; 311 } 312 313 private void writeSLARegistration(Element eWfJob, String slaXml, String jobId, String parentId, String user, 314 String group, String appName, XLog log, ELEvaluator evalSla) throws CommandException { 315 try { 316 if (slaXml != null && slaXml.length() > 0) { 317 Element eSla = XmlUtils.parseXml(slaXml); 318 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, jobId, 319 SlaAppType.WORKFLOW_JOB, user, group, log); 320 if(slaEvent != null) { 321 insertList.add(slaEvent); 322 } 323 // insert into new table 324 SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName, 325 log, false); 326 } 327 // Add sla for wf actions 328 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 329 Element actionSla = XmlUtils.getSLAElement(action); 330 if (actionSla != null) { 331 String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla); 332 actionSla = XmlUtils.parseXml(actionSlaXml); 333 String actionId = Services.get().get(UUIDService.class) 334 .generateChildId(jobId, action.getAttributeValue("name") + ""); 335 SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION, 336 user, appName, log, false); 337 } 338 } 339 } 340 catch (Exception e) { 341 e.printStackTrace(); 342 throw new CommandException(ErrorCode.E1007, "workflow " + jobId, e.getMessage(), e); 343 } 344 } 345 346 /** 347 * Resolve variables in sla xml element. 348 * 349 * @param eSla sla xml element 350 * @param evalSla sla evaluator 351 * @return sla xml string after evaluation 352 * @throws CommandException 353 */ 354 public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException { 355 // EL evaluation 356 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 357 try { 358 slaXml = XmlUtils.removeComments(slaXml); 359 slaXml = evalSla.evaluate(slaXml, String.class); 360 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 361 return slaXml; 362 } 363 catch (Exception e) { 364 throw new CommandException(ErrorCode.E1004, "Validation error :" + e.getMessage(), e); 365 } 366 } 367 368 /** 369 * Create an EL evaluator for a given group. 370 * 371 * @param conf configuration variable 372 * @param group group variable 373 * @return the evaluator created for the group 374 */ 375 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 376 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 377 for (Map.Entry<String, String> entry : conf) { 378 eval.setVariable(entry.getKey(), entry.getValue()); 379 } 380 return eval; 381 } 382 383 @Override 384 public String getEntityKey() { 385 return null; 386 } 387 388 @Override 389 protected boolean isLockRequired() { 390 return false; 391 } 392 393 @Override 394 protected void loadState() { 395 396 } 397 398 @Override 399 protected void verifyPrecondition() throws CommandException { 400 401 } 402 403}