001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.Path; 022 import org.apache.hadoop.fs.FileSystem; 023 import org.apache.oozie.WorkflowJobBean; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.service.HadoopAccessorException; 026 import org.apache.oozie.service.WorkflowStoreService; 027 import org.apache.oozie.service.WorkflowAppService; 028 import org.apache.oozie.service.HadoopAccessorService; 029 import org.apache.oozie.service.Services; 030 import org.apache.oozie.service.DagXLogInfoService; 031 import org.apache.oozie.util.XLog; 032 import org.apache.oozie.util.ParamChecker; 033 import org.apache.oozie.util.XConfiguration; 034 import org.apache.oozie.util.XmlUtils; 035 import org.apache.oozie.command.Command; 036 import org.apache.oozie.command.CommandException; 037 import org.apache.oozie.command.coord.CoordSubmitCommand; 038 import org.apache.oozie.coord.CoordELFunctions; 039 import org.apache.oozie.coord.CoordinatorJobException; 040 import org.apache.oozie.service.ELService; 041 import org.apache.oozie.service.SchemaService; 042 import org.apache.oozie.service.WorkflowAppService; 043 import org.apache.oozie.service.DagXLogInfoService; 044 import org.apache.oozie.service.WorkflowStoreService; 045 import org.apache.oozie.store.StoreException; 046 import org.apache.oozie.store.Store; 047 import org.apache.oozie.store.WorkflowStore; 048 import org.apache.oozie.workflow.WorkflowApp; 049 import org.apache.oozie.workflow.WorkflowException; 050 import org.apache.oozie.workflow.WorkflowInstance; 051 import org.apache.oozie.workflow.WorkflowLib; 052 import org.apache.oozie.util.ELEvaluator; 053 import org.apache.oozie.util.ParamChecker; 054 import org.apache.oozie.util.PropertiesUtils; 055 import org.apache.oozie.util.XLog; 056 import org.apache.oozie.util.XmlUtils; 057 import org.apache.oozie.util.XConfiguration; 058 import org.apache.oozie.util.db.SLADbOperations; 059 import org.apache.oozie.service.Services; 060 import org.apache.oozie.service.SchemaService.SchemaName; 061 import org.apache.oozie.client.OozieClient; 062 import org.apache.oozie.client.WorkflowJob; 063 import org.apache.oozie.client.SLAEvent.SlaAppType; 064 import org.jdom.Element; 065 import org.jdom.JDOMException; 066 import org.jdom.Namespace; 067 068 import java.util.Date; 069 import java.util.List; 070 import java.util.Map; 071 import java.util.Set; 072 import java.util.HashSet; 073 import java.util.Map; 074 import java.io.IOException; 075 076 public class SubmitCommand extends WorkflowCommand<String> { 077 public static final String CONFIG_DEFAULT = "config-default.xml"; 078 079 private Configuration conf; 080 private String authToken; 081 082 public SubmitCommand(Configuration conf, String authToken) { 083 super("submit", "submit", 1, XLog.STD); 084 this.conf = ParamChecker.notNull(conf, "conf"); 085 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 086 } 087 088 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 089 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 090 091 static { 092 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 093 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 094 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 095 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS}; 096 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 097 098 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI, 099 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME}; 100 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 101 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 102 } 103 104 @Override 105 protected String call(WorkflowStore store) throws StoreException, CommandException { 106 incrJobCounter(1); 107 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 108 try { 109 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 110 WorkflowApp app = wps.parseDef(conf, authToken); 111 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true); 112 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 113 114 Path configDefault = new Path(new Path(conf.get(OozieClient.APP_PATH)).getParent(), CONFIG_DEFAULT); 115 String user = conf.get(OozieClient.USER_NAME); 116 String group = conf.get(OozieClient.GROUP_NAME); 117 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, 118 configDefault.toUri(), new Configuration()); 119 120 if (fs.exists(configDefault)) { 121 try { 122 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 123 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 124 XConfiguration.injectDefaults(defaultConf, conf); 125 } 126 catch (IOException ex) { 127 throw new IOException("default configuration file, " + ex.getMessage(), ex); 128 } 129 } 130 131 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 132 133 // Resolving all variables in the job properties. 134 // This ensures the Hadoop Configuration semantics is preserved. 135 XConfiguration resolvedVarsConf = new XConfiguration(); 136 for (Map.Entry<String, String> entry : conf) { 137 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 138 } 139 conf = resolvedVarsConf; 140 141 WorkflowInstance wfInstance; 142 try { 143 wfInstance = workflowLib.createInstance(app, conf); 144 } 145 catch (WorkflowException e) { 146 throw new StoreException(e); 147 } 148 149 Configuration conf = wfInstance.getConf(); 150 // System.out.println("WF INSTANCE CONF:"); 151 // System.out.println(XmlUtils.prettyPrint(conf).toString()); 152 153 WorkflowJobBean workflow = new WorkflowJobBean(); 154 workflow.setId(wfInstance.getId()); 155 workflow.setAppName(app.getName()); 156 workflow.setAppPath(conf.get(OozieClient.APP_PATH)); 157 workflow.setConf(XmlUtils.prettyPrint(conf).toString()); 158 workflow.setProtoActionConf(protoActionConf.toXmlString()); 159 workflow.setCreatedTime(new Date()); 160 workflow.setLastModifiedTime(new Date()); 161 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 162 workflow.setStatus(WorkflowJob.Status.PREP); 163 workflow.setRun(0); 164 workflow.setUser(conf.get(OozieClient.USER_NAME)); 165 workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); 166 workflow.setAuthToken(authToken); 167 workflow.setWorkflowInstance(wfInstance); 168 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 169 170 setLogInfo(workflow); 171 Element wfElem = XmlUtils.parseXml(app.getDefinition()); 172 ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit"); 173 String jobSlaXml = verifySlaElements(wfElem, evalSla); 174 writeSLARegistration(jobSlaXml, workflow.getId(), workflow.getUser(), workflow.getGroup(), store); 175 workflow.setSlaXml(jobSlaXml); 176 // System.out.println("SlaXml :"+ slaXml); 177 178 store.insertWorkflow(workflow); 179 180 // Configuration conf1 = workflow.getWorkflowInstance().getConf(); 181 // System.out.println("WF1 INSTANCE CONF:"); 182 // System.out.println(XmlUtils.prettyPrint(conf1).toString()); 183 // Add WF_JOB SLA Registration event 184 185 return workflow.getId(); 186 } 187 catch (WorkflowException ex) { 188 throw new CommandException(ex); 189 } 190 catch (HadoopAccessorException ex) { 191 throw new CommandException(ex); 192 } 193 catch (Exception ex) { 194 throw new CommandException(ErrorCode.E0803, ex); 195 } 196 } 197 198 private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException { 199 String jobSlaXml = ""; 200 // String prefix = XmlUtils.getNamespacePrefix(eWfJob, 201 // SchemaService.SLA_NAME_SPACE_URI); 202 // Validate WF job 203 Element eSla = eWfJob.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 204 if (eSla != null) { 205 jobSlaXml = resolveSla(eSla, evalSla); 206 } 207 208 // Validate all actions 209 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 210 eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 211 if (eSla != null) { 212 resolveSla(eSla, evalSla); 213 } 214 } 215 return jobSlaXml; 216 } 217 218 private void writeSLARegistration(String slaXml, String id, String user, String group, Store store) 219 throws CommandException { 220 try { 221 if (slaXml != null && slaXml.length() > 0) { 222 Element eSla = XmlUtils.parseXml(slaXml); 223 SLADbOperations.writeSlaRegistrationEvent(eSla, store, id, SlaAppType.WORKFLOW_JOB, user, group); 224 } 225 } 226 catch (Exception e) { 227 // TODO Auto-generated catch block 228 e.printStackTrace(); 229 throw new CommandException(ErrorCode.E1007, "workflow " + id, e); 230 } 231 } 232 233 public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException { 234 // EL evaluation 235 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 236 try { 237 slaXml = XmlUtils.removeComments(slaXml); 238 slaXml = evalSla.evaluate(slaXml, String.class); 239 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 240 return slaXml; 241 } 242 catch (Exception e) { 243 throw new CommandException(ErrorCode.E1004, "Validation erro :" + e.getMessage(), e); 244 } 245 } 246 247 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 248 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 249 for (Map.Entry<String, String> entry : conf) { 250 eval.setVariable(entry.getKey(), entry.getValue()); 251 } 252 return eval; 253 } 254 255 }