001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.Path; 022 import org.apache.hadoop.fs.FileSystem; 023 import org.apache.oozie.WorkflowJobBean; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.service.HadoopAccessorException; 026 import org.apache.oozie.service.JPAService; 027 import org.apache.oozie.service.WorkflowStoreService; 028 import org.apache.oozie.service.WorkflowAppService; 029 import org.apache.oozie.service.HadoopAccessorService; 030 import org.apache.oozie.service.Services; 031 import org.apache.oozie.service.DagXLogInfoService; 032 import org.apache.oozie.util.XLog; 033 import org.apache.oozie.util.ParamChecker; 034 import org.apache.oozie.util.XConfiguration; 035 import org.apache.oozie.util.XmlUtils; 036 import org.apache.oozie.command.CommandException; 037 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; 038 import org.apache.oozie.service.ELService; 039 import org.apache.oozie.service.SchemaService; 040 import org.apache.oozie.store.StoreException; 041 import org.apache.oozie.workflow.WorkflowApp; 042 import org.apache.oozie.workflow.WorkflowException; 043 import org.apache.oozie.workflow.WorkflowInstance; 044 import org.apache.oozie.workflow.WorkflowLib; 045 import org.apache.oozie.util.ELEvaluator; 046 import org.apache.oozie.util.InstrumentUtils; 047 import org.apache.oozie.util.PropertiesUtils; 048 import org.apache.oozie.util.db.SLADbOperations; 049 import org.apache.oozie.service.SchemaService.SchemaName; 050 import org.apache.oozie.client.OozieClient; 051 import org.apache.oozie.client.WorkflowJob; 052 import org.apache.oozie.client.SLAEvent.SlaAppType; 053 import org.jdom.Element; 054 import org.jdom.Namespace; 055 056 import java.util.Date; 057 import java.util.List; 058 import java.util.Map; 059 import java.util.Set; 060 import java.util.HashSet; 061 import java.io.IOException; 062 import java.net.URI; 063 064 public class SubmitXCommand extends WorkflowXCommand<String> { 065 public static final String CONFIG_DEFAULT = "config-default.xml"; 066 067 private Configuration conf; 068 private String authToken; 069 070 public SubmitXCommand(Configuration conf, String authToken) { 071 super("submit", "submit", 1); 072 this.conf = ParamChecker.notNull(conf, "conf"); 073 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 074 } 075 076 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 077 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 078 079 static { 080 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 081 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 082 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 083 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS}; 084 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 085 086 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI, 087 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME}; 088 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 089 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 090 } 091 092 @Override 093 protected String execute() throws CommandException { 094 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 095 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 096 try { 097 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 098 WorkflowApp app = wps.parseDef(conf, authToken); 099 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true); 100 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 101 102 String user = conf.get(OozieClient.USER_NAME); 103 String group = conf.get(OozieClient.GROUP_NAME); 104 URI uri = new URI(conf.get(OozieClient.APP_PATH)); 105 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, 106 group, uri, new Configuration()); 107 108 Path configDefault = null; 109 // app path could be a directory 110 Path path = new Path(uri.getPath()); 111 if (!fs.isFile(path)) { 112 configDefault = new Path(path, SubmitCommand.CONFIG_DEFAULT); 113 } else { 114 configDefault = new Path(path.getParent(), SubmitCommand.CONFIG_DEFAULT); 115 } 116 117 if (fs.exists(configDefault)) { 118 try { 119 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 120 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 121 XConfiguration.injectDefaults(defaultConf, conf); 122 } 123 catch (IOException ex) { 124 throw new IOException("default configuration file, " + ex.getMessage(), ex); 125 } 126 } 127 128 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 129 130 // Resolving all variables in the job properties. 131 // This ensures the Hadoop Configuration semantics is preserved. 132 XConfiguration resolvedVarsConf = new XConfiguration(); 133 for (Map.Entry<String, String> entry : conf) { 134 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 135 } 136 conf = resolvedVarsConf; 137 138 WorkflowInstance wfInstance; 139 try { 140 wfInstance = workflowLib.createInstance(app, conf); 141 } 142 catch (WorkflowException e) { 143 throw new StoreException(e); 144 } 145 146 Configuration conf = wfInstance.getConf(); 147 // System.out.println("WF INSTANCE CONF:"); 148 // System.out.println(XmlUtils.prettyPrint(conf).toString()); 149 150 WorkflowJobBean workflow = new WorkflowJobBean(); 151 workflow.setId(wfInstance.getId()); 152 workflow.setAppName(app.getName()); 153 workflow.setAppPath(conf.get(OozieClient.APP_PATH)); 154 workflow.setConf(XmlUtils.prettyPrint(conf).toString()); 155 workflow.setProtoActionConf(protoActionConf.toXmlString()); 156 workflow.setCreatedTime(new Date()); 157 workflow.setLastModifiedTime(new Date()); 158 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 159 workflow.setStatus(WorkflowJob.Status.PREP); 160 workflow.setRun(0); 161 workflow.setUser(conf.get(OozieClient.USER_NAME)); 162 workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); 163 workflow.setAuthToken(authToken); 164 workflow.setWorkflowInstance(wfInstance); 165 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 166 167 //setLogInfo(workflow); 168 Element wfElem = XmlUtils.parseXml(app.getDefinition()); 169 ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit"); 170 String jobSlaXml = verifySlaElements(wfElem, evalSla); 171 writeSLARegistration(jobSlaXml, workflow.getId(), workflow.getUser(), workflow.getGroup(), LOG); 172 workflow.setSlaXml(jobSlaXml); 173 // System.out.println("SlaXml :"+ slaXml); 174 175 //store.insertWorkflow(workflow); 176 JPAService jpaService = Services.get().get(JPAService.class); 177 if (jpaService != null) { 178 jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow)); 179 } 180 else { 181 LOG.error(ErrorCode.E0610); 182 return null; 183 } 184 185 return workflow.getId(); 186 } 187 catch (WorkflowException ex) { 188 throw new CommandException(ex); 189 } 190 catch (HadoopAccessorException ex) { 191 throw new CommandException(ex); 192 } 193 catch (Exception ex) { 194 throw new CommandException(ErrorCode.E0803, ex); 195 } 196 } 197 198 private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException { 199 String jobSlaXml = ""; 200 // Validate WF job 201 Element eSla = eWfJob.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 202 if (eSla != null) { 203 jobSlaXml = resolveSla(eSla, evalSla); 204 } 205 206 // Validate all actions 207 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 208 eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 209 if (eSla != null) { 210 resolveSla(eSla, evalSla); 211 } 212 } 213 return jobSlaXml; 214 } 215 216 private void writeSLARegistration(String slaXml, String id, String user, String group, XLog log) 217 throws CommandException { 218 try { 219 if (slaXml != null && slaXml.length() > 0) { 220 Element eSla = XmlUtils.parseXml(slaXml); 221 SLADbOperations.writeSlaRegistrationEvent(eSla, id, SlaAppType.WORKFLOW_JOB, user, group, log); 222 } 223 } 224 catch (Exception e) { 225 e.printStackTrace(); 226 throw new CommandException(ErrorCode.E1007, "workflow " + id, e); 227 } 228 } 229 230 /** 231 * Resolve variables in sla xml element. 232 * 233 * @param eSla sla xml element 234 * @param evalSla sla evaluator 235 * @return sla xml string after evaluation 236 * @throws CommandException 237 */ 238 public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException { 239 // EL evaluation 240 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 241 try { 242 slaXml = XmlUtils.removeComments(slaXml); 243 slaXml = evalSla.evaluate(slaXml, String.class); 244 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 245 return slaXml; 246 } 247 catch (Exception e) { 248 throw new CommandException(ErrorCode.E1004, "Validation erro :" + e.getMessage(), e); 249 } 250 } 251 252 /** 253 * Create an EL evaluator for a given group. 254 * 255 * @param conf configuration variable 256 * @param group group variable 257 * @return the evaluator created for the group 258 */ 259 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 260 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 261 for (Map.Entry<String, String> entry : conf) { 262 eval.setVariable(entry.getKey(), entry.getValue()); 263 } 264 return eval; 265 } 266 267 @Override 268 protected String getEntityKey() { 269 return null; 270 } 271 272 @Override 273 protected boolean isLockRequired() { 274 return false; 275 } 276 277 @Override 278 protected void loadState() { 279 280 } 281 282 @Override 283 protected void verifyPrecondition() throws CommandException { 284 285 } 286 287 }