001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.Path; 022 import org.apache.hadoop.fs.FileSystem; 023 import org.apache.oozie.SLAEventBean; 024 import org.apache.oozie.WorkflowJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.service.HadoopAccessorException; 027 import org.apache.oozie.service.JPAService; 028 import org.apache.oozie.service.WorkflowStoreService; 029 import org.apache.oozie.service.WorkflowAppService; 030 import org.apache.oozie.service.HadoopAccessorService; 031 import org.apache.oozie.service.Services; 032 import org.apache.oozie.service.DagXLogInfoService; 033 import org.apache.oozie.util.ConfigUtils; 034 import org.apache.oozie.util.ELUtils; 035 import org.apache.oozie.util.LogUtils; 036 import org.apache.oozie.util.XLog; 037 import org.apache.oozie.util.ParamChecker; 038 import org.apache.oozie.util.XConfiguration; 039 import org.apache.oozie.util.XmlUtils; 040 import org.apache.oozie.command.CommandException; 041 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 042 import org.apache.oozie.executor.jpa.JPAExecutorException; 043 import org.apache.oozie.service.ELService; 044 import org.apache.oozie.service.SchemaService; 045 import org.apache.oozie.store.StoreException; 046 import org.apache.oozie.workflow.WorkflowApp; 047 import org.apache.oozie.workflow.WorkflowException; 048 import org.apache.oozie.workflow.WorkflowInstance; 049 import org.apache.oozie.workflow.WorkflowLib; 050 import org.apache.oozie.util.ELEvaluator; 051 import org.apache.oozie.util.InstrumentUtils; 052 import org.apache.oozie.util.PropertiesUtils; 053 import org.apache.oozie.util.db.SLADbOperations; 054 import org.apache.oozie.service.SchemaService.SchemaName; 055 import org.apache.oozie.client.OozieClient; 056 import org.apache.oozie.client.WorkflowJob; 057 import org.apache.oozie.client.SLAEvent.SlaAppType; 058 import org.apache.oozie.client.rest.JsonBean; 059 import org.jdom.Element; 060 import org.jdom.Namespace; 061 062 import java.util.ArrayList; 063 import java.util.Date; 064 import java.util.List; 065 import java.util.Map; 066 import java.util.Set; 067 import java.util.HashSet; 068 import java.io.IOException; 069 import java.net.URI; 070 071 public class SubmitXCommand extends WorkflowXCommand<String> { 072 public static final String CONFIG_DEFAULT = "config-default.xml"; 073 074 private Configuration conf; 075 private String authToken; 076 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 077 078 /** 079 * Constructor to create the workflow Submit Command. 080 * 081 * @param conf : Configuration for workflow job 082 * @param authToken : To be used for authentication 083 */ 084 public SubmitXCommand(Configuration conf, String authToken) { 085 super("submit", "submit", 1); 086 this.conf = ParamChecker.notNull(conf, "conf"); 087 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 088 } 089 090 /** 091 * Constructor to create the workflow Submit Command. 092 * 093 * @param dryrun : if dryrun 094 * @param conf : Configuration for workflow job 095 * @param authToken : To be used for authentication 096 */ 097 public SubmitXCommand(boolean dryrun, Configuration conf, String authToken) { 098 this(conf, authToken); 099 this.dryrun = dryrun; 100 } 101 102 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 103 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 104 105 static { 106 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 107 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 108 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 109 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS}; 110 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 111 112 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER}; 113 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 114 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 115 } 116 117 @Override 118 protected String execute() throws CommandException { 119 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 120 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 121 try { 122 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 123 WorkflowApp app = wps.parseDef(conf, authToken); 124 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true); 125 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 126 127 String user = conf.get(OozieClient.USER_NAME); 128 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 129 URI uri = new URI(conf.get(OozieClient.APP_PATH)); 130 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 131 Configuration fsConf = has.createJobConf(uri.getAuthority()); 132 FileSystem fs = has.createFileSystem(user, uri, fsConf); 133 134 Path configDefault = null; 135 // app path could be a directory 136 Path path = new Path(uri.getPath()); 137 if (!fs.isFile(path)) { 138 configDefault = new Path(path, CONFIG_DEFAULT); 139 } else { 140 configDefault = new Path(path.getParent(), CONFIG_DEFAULT); 141 } 142 143 if (fs.exists(configDefault)) { 144 try { 145 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 146 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 147 XConfiguration.injectDefaults(defaultConf, conf); 148 } 149 catch (IOException ex) { 150 throw new IOException("default configuration file, " + ex.getMessage(), ex); 151 } 152 } 153 154 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 155 156 // Resolving all variables in the job properties. 157 // This ensures the Hadoop Configuration semantics is preserved. 158 XConfiguration resolvedVarsConf = new XConfiguration(); 159 for (Map.Entry<String, String> entry : conf) { 160 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 161 } 162 conf = resolvedVarsConf; 163 164 WorkflowInstance wfInstance; 165 try { 166 wfInstance = workflowLib.createInstance(app, conf); 167 } 168 catch (WorkflowException e) { 169 throw new StoreException(e); 170 } 171 172 Configuration conf = wfInstance.getConf(); 173 // System.out.println("WF INSTANCE CONF:"); 174 // System.out.println(XmlUtils.prettyPrint(conf).toString()); 175 176 WorkflowJobBean workflow = new WorkflowJobBean(); 177 workflow.setId(wfInstance.getId()); 178 workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf)); 179 workflow.setAppPath(conf.get(OozieClient.APP_PATH)); 180 workflow.setConf(XmlUtils.prettyPrint(conf).toString()); 181 workflow.setProtoActionConf(protoActionConf.toXmlString()); 182 workflow.setCreatedTime(new Date()); 183 workflow.setLastModifiedTime(new Date()); 184 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 185 workflow.setStatus(WorkflowJob.Status.PREP); 186 workflow.setRun(0); 187 workflow.setUser(conf.get(OozieClient.USER_NAME)); 188 workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); 189 workflow.setAuthToken(authToken); 190 workflow.setWorkflowInstance(wfInstance); 191 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 192 193 LogUtils.setLogInfo(workflow, logInfo); 194 LOG = XLog.resetPrefix(LOG); 195 LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus()); 196 Element wfElem = XmlUtils.parseXml(app.getDefinition()); 197 ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit"); 198 String jobSlaXml = verifySlaElements(wfElem, evalSla); 199 if (!dryrun) { 200 writeSLARegistration(jobSlaXml, workflow.getId(), workflow.getUser(), workflow.getGroup(), LOG); 201 workflow.setSlaXml(jobSlaXml); 202 // System.out.println("SlaXml :"+ slaXml); 203 204 //store.insertWorkflow(workflow); 205 insertList.add(workflow); 206 JPAService jpaService = Services.get().get(JPAService.class); 207 if (jpaService != null) { 208 try { 209 jpaService.execute(new BulkUpdateInsertJPAExecutor(null, insertList)); 210 } 211 catch (JPAExecutorException je) { 212 throw new CommandException(je); 213 } 214 } 215 else { 216 LOG.error(ErrorCode.E0610); 217 return null; 218 } 219 220 return workflow.getId(); 221 } 222 else { 223 return "OK"; 224 } 225 } 226 catch (WorkflowException ex) { 227 throw new CommandException(ex); 228 } 229 catch (HadoopAccessorException ex) { 230 throw new CommandException(ex); 231 } 232 catch (Exception ex) { 233 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 234 } 235 } 236 237 private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException { 238 String jobSlaXml = ""; 239 // Validate WF job 240 Element eSla = eWfJob.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 241 if (eSla != null) { 242 jobSlaXml = resolveSla(eSla, evalSla); 243 } 244 245 // Validate all actions 246 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 247 eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 248 if (eSla != null) { 249 resolveSla(eSla, evalSla); 250 } 251 } 252 return jobSlaXml; 253 } 254 255 private void writeSLARegistration(String slaXml, String id, String user, String group, XLog log) 256 throws CommandException { 257 try { 258 if (slaXml != null && slaXml.length() > 0) { 259 Element eSla = XmlUtils.parseXml(slaXml); 260 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, id, 261 SlaAppType.WORKFLOW_JOB, user, group, log); 262 if(slaEvent != null) { 263 insertList.add(slaEvent); 264 } 265 } 266 } 267 catch (Exception e) { 268 e.printStackTrace(); 269 throw new CommandException(ErrorCode.E1007, "workflow " + id, e.getMessage(), e); 270 } 271 } 272 273 /** 274 * Resolve variables in sla xml element. 275 * 276 * @param eSla sla xml element 277 * @param evalSla sla evaluator 278 * @return sla xml string after evaluation 279 * @throws CommandException 280 */ 281 public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException { 282 // EL evaluation 283 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 284 try { 285 slaXml = XmlUtils.removeComments(slaXml); 286 slaXml = evalSla.evaluate(slaXml, String.class); 287 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 288 return slaXml; 289 } 290 catch (Exception e) { 291 throw new CommandException(ErrorCode.E1004, "Validation erro :" + e.getMessage(), e); 292 } 293 } 294 295 /** 296 * Create an EL evaluator for a given group. 297 * 298 * @param conf configuration variable 299 * @param group group variable 300 * @return the evaluator created for the group 301 */ 302 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 303 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 304 for (Map.Entry<String, String> entry : conf) { 305 eval.setVariable(entry.getKey(), entry.getValue()); 306 } 307 return eval; 308 } 309 310 @Override 311 public String getEntityKey() { 312 return null; 313 } 314 315 @Override 316 protected boolean isLockRequired() { 317 return false; 318 } 319 320 @Override 321 protected void loadState() { 322 323 } 324 325 @Override 326 protected void verifyPrecondition() throws CommandException { 327 328 } 329 330 }