001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.oozie.WorkflowJobBean; 023import org.apache.oozie.ErrorCode; 024import org.apache.oozie.service.JPAService; 025import org.apache.oozie.service.WorkflowStoreService; 026import org.apache.oozie.service.WorkflowAppService; 027import org.apache.oozie.service.Services; 028import org.apache.oozie.service.DagXLogInfoService; 029import org.apache.oozie.util.InstrumentUtils; 030import org.apache.oozie.util.LogUtils; 031import org.apache.oozie.util.XLog; 032import org.apache.oozie.util.ParamChecker; 033import org.apache.oozie.util.XConfiguration; 034import org.apache.oozie.util.XmlUtils; 035import org.apache.oozie.command.CommandException; 036import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; 037import org.apache.oozie.store.StoreException; 038import org.apache.oozie.workflow.WorkflowApp; 039import org.apache.oozie.workflow.WorkflowException; 040import org.apache.oozie.workflow.WorkflowInstance; 041import org.apache.oozie.workflow.WorkflowLib; 042import org.apache.oozie.util.PropertiesUtils; 043import org.apache.oozie.client.OozieClient; 044import org.apache.oozie.client.WorkflowJob; 045import org.apache.oozie.client.XOozieClient; 046import org.jdom.Element; 047import org.jdom.Namespace; 048 049import java.util.Date; 050import java.util.List; 051import java.util.Map; 052import java.util.Set; 053import java.util.HashSet; 054 055public abstract class SubmitHttpXCommand extends WorkflowXCommand<String> { 056 057 protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>(); 058 protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>(); 059 060 static { 061 MANDATORY_OOZIE_CONFS.add(XOozieClient.JT); 062 MANDATORY_OOZIE_CONFS.add(XOozieClient.NN); 063 MANDATORY_OOZIE_CONFS.add(OozieClient.LIBPATH); 064 065 OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES); 066 OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES); 067 } 068 069 private Configuration conf; 070 071 public SubmitHttpXCommand(String name, String type, Configuration conf) { 072 super(name, type, 1); 073 this.conf = ParamChecker.notNull(conf, "conf"); 074 } 075 076 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 077 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 078 079 static { 080 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 081 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 082 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 083 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 084 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 085 086 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 087 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 088 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 089 } 090 091 abstract protected Element generateSection(Configuration conf, Namespace ns); 092 093 abstract protected Namespace getSectionNamespace(); 094 095 abstract protected String getWorkflowName(); 096 097 protected void checkMandatoryConf(Configuration conf) { 098 for (String key : MANDATORY_OOZIE_CONFS) { 099 String value = conf.get(key); 100 if (value == null) { 101 throw new RuntimeException(key + " is not specified"); 102 } 103 } 104 } 105 106 protected Namespace getWorkflowNamespace() { 107 return Namespace.getNamespace("uri:oozie:workflow:0.2"); 108 } 109 /** 110 * Generate workflow xml from conf object 111 * 112 * @param conf the configuration object 113 * @return workflow xml def string representation 114 */ 115 protected String getWorkflowXml(Configuration conf) { 116 checkMandatoryConf(conf); 117 118 Namespace ns = getWorkflowNamespace(); 119 Element root = new Element("workflow-app", ns); 120 String name = getWorkflowName(); 121 root.setAttribute("name", "oozie-" + name); 122 123 Element start = new Element("start", ns); 124 String nodeName = name + "1"; 125 start.setAttribute("to", nodeName); 126 root.addContent(start); 127 128 Element action = new Element("action", ns); 129 action.setAttribute("name", nodeName); 130 131 Element ele = generateSection(conf, getSectionNamespace()); 132 action.addContent(ele); 133 134 Element ok = new Element("ok", ns); 135 ok.setAttribute("to", "end"); 136 action.addContent(ok); 137 138 Element error = new Element("error", ns); 139 error.setAttribute("to", "fail"); 140 action.addContent(error); 141 142 root.addContent(action); 143 144 Element kill = new Element("kill", ns); 145 kill.setAttribute("name", "fail"); 146 Element message = new Element("message", ns); 147 message.addContent(name + " failed, error message[${wf:errorMessage(wf:lastErrorNode())}]"); 148 kill.addContent(message); 149 root.addContent(kill); 150 151 Element end = new Element("end", ns); 152 end.setAttribute("name", "end"); 153 root.addContent(end); 154 155 return XmlUtils.prettyPrint(root).toString(); 156 }; 157 158 protected Element generateConfigurationSection(List<String> Dargs, Namespace ns) { 159 Element configuration = new Element("configuration", ns); 160 for (String arg : Dargs) { 161 String name = null, value = null; 162 int pos = arg.indexOf("="); 163 if (pos == -1) { // "-D<name>" or "-D" only 164 name = arg.substring(2, arg.length()); 165 value = ""; 166 } 167 else { // "-D<name>=<value>" 168 name = arg.substring(2, pos); 169 value = arg.substring(pos + 1, arg.length()); 170 } 171 172 Element property = new Element("property", ns); 173 Element nameElement = new Element("name", ns); 174 nameElement.addContent(name); 175 property.addContent(nameElement); 176 Element valueElement = new Element("value", ns); 177 valueElement.addContent(value); 178 property.addContent(valueElement); 179 configuration.addContent(property); 180 } 181 182 return configuration; 183 } 184 185 /* (non-Javadoc) 186 * @see org.apache.oozie.command.XCommand#execute() 187 */ 188 @Override 189 protected String execute() throws CommandException { 190 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 191 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 192 try { 193 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 194 String wfXml = getWorkflowXml(conf); 195 LOG.debug("workflow xml created on the server side is :\n"); 196 LOG.debug(wfXml); 197 WorkflowApp app = wps.parseDef(wfXml, conf); 198 XConfiguration protoActionConf = wps.createProtoActionConf(conf, false); 199 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 200 201 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 202 203 // Resolving all variables in the job properties. 204 // This ensures the Hadoop Configuration semantics is preserved. 205 XConfiguration resolvedVarsConf = new XConfiguration(); 206 for (Map.Entry<String, String> entry : conf) { 207 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 208 } 209 conf = resolvedVarsConf; 210 211 WorkflowInstance wfInstance; 212 try { 213 wfInstance = workflowLib.createInstance(app, conf); 214 } 215 catch (WorkflowException e) { 216 throw new StoreException(e); 217 } 218 219 Configuration conf = wfInstance.getConf(); 220 221 WorkflowJobBean workflow = new WorkflowJobBean(); 222 workflow.setId(wfInstance.getId()); 223 workflow.setAppName(app.getName()); 224 workflow.setAppPath(conf.get(OozieClient.APP_PATH)); 225 workflow.setConf(XmlUtils.prettyPrint(conf).toString()); 226 workflow.setProtoActionConf(protoActionConf.toXmlString()); 227 workflow.setCreatedTime(new Date()); 228 workflow.setLastModifiedTime(new Date()); 229 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 230 workflow.setStatus(WorkflowJob.Status.PREP); 231 workflow.setRun(0); 232 workflow.setUser(conf.get(OozieClient.USER_NAME)); 233 workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); 234 workflow.setWorkflowInstance(wfInstance); 235 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 236 237 LogUtils.setLogInfo(workflow); 238 JPAService jpaService = Services.get().get(JPAService.class); 239 if (jpaService != null) { 240 jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow)); 241 } 242 else { 243 LOG.error(ErrorCode.E0610); 244 return null; 245 } 246 247 return workflow.getId(); 248 } 249 catch (WorkflowException ex) { 250 throw new CommandException(ex); 251 } 252 catch (Exception ex) { 253 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 254 } 255 } 256 257 static private void addSection(Element X, Namespace ns, String filesStr, String tagName) { 258 if (filesStr != null) { 259 String[] files = filesStr.split(","); 260 for (String f : files) { 261 Element tagElement = new Element(tagName, ns); 262 if (f.contains("#")) { 263 tagElement.addContent(f); 264 } 265 else { 266 String filename = f.substring(f.lastIndexOf("/") + 1, f.length()); 267 if (filename == null || filename.isEmpty()) { 268 tagElement.addContent(f); 269 } 270 else { 271 tagElement.addContent(f + "#" + filename); 272 } 273 } 274 X.addContent(tagElement); 275 } 276 } 277 } 278 279 /** 280 * Add file section in X. 281 * 282 * @param parent XML element to be appended 283 * @param conf Configuration object 284 * @param ns XML element namespace 285 */ 286 static void addFileSection(Element X, Configuration conf, Namespace ns) { 287 String filesStr = conf.get(XOozieClient.FILES); 288 addSection(X, ns, filesStr, "file"); 289 } 290 291 /** 292 * Add archive section in X. 293 * 294 * @param parent XML element to be appended 295 * @param conf Configuration object 296 * @param ns XML element namespace 297 */ 298 static void addArchiveSection(Element X, Configuration conf, Namespace ns) { 299 String archivesStr = conf.get(XOozieClient.ARCHIVES); 300 addSection(X, ns, archivesStr, "archive"); 301 } 302}