001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.Path; 022 import org.apache.hadoop.fs.FileSystem; 023 import org.apache.oozie.AppType; 024 import org.apache.oozie.SLAEventBean; 025 import org.apache.oozie.WorkflowJobBean; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; 028 import org.apache.oozie.service.HadoopAccessorException; 029 import org.apache.oozie.service.JPAService; 030 import org.apache.oozie.service.UUIDService; 031 import org.apache.oozie.service.WorkflowStoreService; 032 import org.apache.oozie.service.WorkflowAppService; 033 import org.apache.oozie.service.HadoopAccessorService; 034 import org.apache.oozie.service.Services; 035 import org.apache.oozie.service.DagXLogInfoService; 036 import org.apache.oozie.util.ConfigUtils; 037 import org.apache.oozie.util.ELUtils; 038 import org.apache.oozie.util.LogUtils; 039 import org.apache.oozie.sla.SLAOperations; 040 import org.apache.oozie.util.XLog; 041 import org.apache.oozie.util.ParamChecker; 042 import org.apache.oozie.util.XConfiguration; 043 import org.apache.oozie.util.XmlUtils; 044 import org.apache.oozie.command.CommandException; 045 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 046 import org.apache.oozie.executor.jpa.JPAExecutorException; 047 import org.apache.oozie.service.ELService; 048 import org.apache.oozie.store.StoreException; 049 import org.apache.oozie.workflow.WorkflowApp; 050 import org.apache.oozie.workflow.WorkflowException; 051 import org.apache.oozie.workflow.WorkflowInstance; 052 import org.apache.oozie.workflow.WorkflowLib; 053 import org.apache.oozie.util.ELEvaluator; 054 import org.apache.oozie.util.InstrumentUtils; 055 import org.apache.oozie.util.PropertiesUtils; 056 import org.apache.oozie.util.db.SLADbOperations; 057 import org.apache.oozie.service.SchemaService.SchemaName; 058 import org.apache.oozie.client.OozieClient; 059 import org.apache.oozie.client.WorkflowJob; 060 import org.apache.oozie.client.SLAEvent.SlaAppType; 061 import org.apache.oozie.client.rest.JsonBean; 062 import org.jdom.Element; 063 import java.util.ArrayList; 064 import java.util.Date; 065 import java.util.List; 066 import java.util.Map; 067 import java.util.Set; 068 import java.util.HashSet; 069 import java.io.IOException; 070 import java.net.URI; 071 072 @SuppressWarnings("deprecation") 073 public class SubmitXCommand extends WorkflowXCommand<String> { 074 public static final String CONFIG_DEFAULT = "config-default.xml"; 075 076 private Configuration conf; 077 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 078 private String parentId; 079 080 /** 081 * Constructor to create the workflow Submit Command. 082 * 083 * @param conf : Configuration for workflow job 084 */ 085 public SubmitXCommand(Configuration conf) { 086 super("submit", "submit", 1); 087 this.conf = ParamChecker.notNull(conf, "conf"); 088 } 089 090 /** 091 * Constructor for submitting wf through coordinator 092 * 093 * @param conf : Configuration for workflow job 094 * @param parentId: the coord action id 095 */ 096 public SubmitXCommand(Configuration conf, String parentId) { 097 this(conf); 098 this.parentId = parentId; 099 } 100 101 /** 102 * Constructor to create the workflow Submit Command. 103 * 104 * @param dryrun : if dryrun 105 * @param conf : Configuration for workflow job 106 * @param authToken : To be used for authentication 107 */ 108 public SubmitXCommand(boolean dryrun, Configuration conf) { 109 this(conf); 110 this.dryrun = dryrun; 111 } 112 113 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 114 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 115 116 static { 117 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 118 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 119 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 120 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS}; 121 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 122 123 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER}; 124 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 125 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 126 } 127 128 @Override 129 protected String execute() throws CommandException { 130 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 131 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 132 try { 133 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 134 WorkflowApp app = wps.parseDef(conf); 135 XConfiguration protoActionConf = wps.createProtoActionConf(conf, true); 136 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 137 138 String user = conf.get(OozieClient.USER_NAME); 139 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 140 URI uri = new URI(conf.get(OozieClient.APP_PATH)); 141 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 142 Configuration fsConf = has.createJobConf(uri.getAuthority()); 143 FileSystem fs = has.createFileSystem(user, uri, fsConf); 144 145 Path configDefault = null; 146 // app path could be a directory 147 Path path = new Path(uri.getPath()); 148 if (!fs.isFile(path)) { 149 configDefault = new Path(path, CONFIG_DEFAULT); 150 } else { 151 configDefault = new Path(path.getParent(), CONFIG_DEFAULT); 152 } 153 154 if (fs.exists(configDefault)) { 155 try { 156 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 157 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 158 XConfiguration.injectDefaults(defaultConf, conf); 159 } 160 catch (IOException ex) { 161 throw new IOException("default configuration file, " + ex.getMessage(), ex); 162 } 163 } 164 165 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 166 167 // Resolving all variables in the job properties. 168 // This ensures the Hadoop Configuration semantics is preserved. 169 XConfiguration resolvedVarsConf = new XConfiguration(); 170 for (Map.Entry<String, String> entry : conf) { 171 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 172 } 173 conf = resolvedVarsConf; 174 175 WorkflowInstance wfInstance; 176 try { 177 wfInstance = workflowLib.createInstance(app, conf); 178 } 179 catch (WorkflowException e) { 180 throw new StoreException(e); 181 } 182 183 Configuration conf = wfInstance.getConf(); 184 // System.out.println("WF INSTANCE CONF:"); 185 // System.out.println(XmlUtils.prettyPrint(conf).toString()); 186 187 WorkflowJobBean workflow = new WorkflowJobBean(); 188 workflow.setId(wfInstance.getId()); 189 workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf)); 190 workflow.setAppPath(conf.get(OozieClient.APP_PATH)); 191 workflow.setConf(XmlUtils.prettyPrint(conf).toString()); 192 workflow.setProtoActionConf(protoActionConf.toXmlString()); 193 workflow.setCreatedTime(new Date()); 194 workflow.setLastModifiedTime(new Date()); 195 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 196 workflow.setStatus(WorkflowJob.Status.PREP); 197 workflow.setRun(0); 198 workflow.setUser(conf.get(OozieClient.USER_NAME)); 199 workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); 200 workflow.setWorkflowInstance(wfInstance); 201 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 202 // Set parent id if it doesn't already have one (for subworkflows) 203 if (workflow.getParentId() == null) { 204 workflow.setParentId(conf.get(SubWorkflowActionExecutor.PARENT_ID)); 205 } 206 // Set to coord action Id if workflow submitted through coordinator 207 if (workflow.getParentId() == null) { 208 workflow.setParentId(parentId); 209 } 210 211 LogUtils.setLogInfo(workflow, logInfo); 212 LOG = XLog.resetPrefix(LOG); 213 LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus()); 214 Element wfElem = XmlUtils.parseXml(app.getDefinition()); 215 ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit"); 216 String jobSlaXml = verifySlaElements(wfElem, evalSla); 217 if (!dryrun) { 218 writeSLARegistration(wfElem, jobSlaXml, workflow.getId(), workflow.getParentId(), workflow.getUser(), 219 workflow.getGroup(), workflow.getAppName(), LOG, evalSla); 220 workflow.setSlaXml(jobSlaXml); 221 // System.out.println("SlaXml :"+ slaXml); 222 223 //store.insertWorkflow(workflow); 224 insertList.add(workflow); 225 JPAService jpaService = Services.get().get(JPAService.class); 226 if (jpaService != null) { 227 try { 228 jpaService.execute(new BulkUpdateInsertJPAExecutor(null, insertList)); 229 } 230 catch (JPAExecutorException je) { 231 throw new CommandException(je); 232 } 233 } 234 else { 235 LOG.error(ErrorCode.E0610); 236 return null; 237 } 238 239 return workflow.getId(); 240 } 241 else { 242 return "OK"; 243 } 244 } 245 catch (WorkflowException ex) { 246 throw new CommandException(ex); 247 } 248 catch (HadoopAccessorException ex) { 249 throw new CommandException(ex); 250 } 251 catch (Exception ex) { 252 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 253 } 254 } 255 256 private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException { 257 String jobSlaXml = ""; 258 // Validate WF job 259 Element eSla = XmlUtils.getSLAElement(eWfJob); 260 if (eSla != null) { 261 jobSlaXml = resolveSla(eSla, evalSla); 262 } 263 264 // Validate all actions 265 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 266 eSla = XmlUtils.getSLAElement(eWfJob); 267 if (eSla != null) { 268 resolveSla(eSla, evalSla); 269 } 270 } 271 return jobSlaXml; 272 } 273 274 private void writeSLARegistration(Element eWfJob, String slaXml, String jobId, String parentId, String user, 275 String group, String appName, XLog log, ELEvaluator evalSla) throws CommandException { 276 try { 277 if (slaXml != null && slaXml.length() > 0) { 278 Element eSla = XmlUtils.parseXml(slaXml); 279 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, jobId, 280 SlaAppType.WORKFLOW_JOB, user, group, log); 281 if(slaEvent != null) { 282 insertList.add(slaEvent); 283 } 284 // insert into new table 285 SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName, 286 log, false); 287 } 288 // Add sla for wf actions 289 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 290 Element actionSla = XmlUtils.getSLAElement(action); 291 if (actionSla != null) { 292 String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla); 293 actionSla = XmlUtils.parseXml(actionSlaXml); 294 String actionId = Services.get().get(UUIDService.class) 295 .generateChildId(jobId, action.getAttributeValue("name") + ""); 296 SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION, 297 user, appName, log, false); 298 } 299 } 300 } 301 catch (Exception e) { 302 e.printStackTrace(); 303 throw new CommandException(ErrorCode.E1007, "workflow " + jobId, e.getMessage(), e); 304 } 305 } 306 307 /** 308 * Resolve variables in sla xml element. 309 * 310 * @param eSla sla xml element 311 * @param evalSla sla evaluator 312 * @return sla xml string after evaluation 313 * @throws CommandException 314 */ 315 public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException { 316 // EL evaluation 317 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 318 try { 319 slaXml = XmlUtils.removeComments(slaXml); 320 slaXml = evalSla.evaluate(slaXml, String.class); 321 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 322 return slaXml; 323 } 324 catch (Exception e) { 325 throw new CommandException(ErrorCode.E1004, "Validation error :" + e.getMessage(), e); 326 } 327 } 328 329 /** 330 * Create an EL evaluator for a given group. 331 * 332 * @param conf configuration variable 333 * @param group group variable 334 * @return the evaluator created for the group 335 */ 336 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 337 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 338 for (Map.Entry<String, String> entry : conf) { 339 eval.setVariable(entry.getKey(), entry.getValue()); 340 } 341 return eval; 342 } 343 344 @Override 345 public String getEntityKey() { 346 return null; 347 } 348 349 @Override 350 protected boolean isLockRequired() { 351 return false; 352 } 353 354 @Override 355 protected void loadState() { 356 357 } 358 359 @Override 360 protected void verifyPrecondition() throws CommandException { 361 362 } 363 364 }