001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.wf; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.oozie.WorkflowJobBean; 022import org.apache.oozie.ErrorCode; 023import org.apache.oozie.service.JPAService; 024import org.apache.oozie.service.WorkflowStoreService; 025import org.apache.oozie.service.WorkflowAppService; 026import org.apache.oozie.service.Services; 027import org.apache.oozie.service.DagXLogInfoService; 028import org.apache.oozie.util.InstrumentUtils; 029import org.apache.oozie.util.LogUtils; 030import org.apache.oozie.util.XLog; 031import org.apache.oozie.util.ParamChecker; 032import org.apache.oozie.util.XConfiguration; 033import org.apache.oozie.util.XmlUtils; 034import org.apache.oozie.command.CommandException; 035import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; 036import org.apache.oozie.store.StoreException; 037import org.apache.oozie.workflow.WorkflowApp; 038import org.apache.oozie.workflow.WorkflowException; 039import org.apache.oozie.workflow.WorkflowInstance; 040import org.apache.oozie.workflow.WorkflowLib; 041import org.apache.oozie.util.PropertiesUtils; 042import org.apache.oozie.client.OozieClient; 043import org.apache.oozie.client.WorkflowJob; 044import org.apache.oozie.client.XOozieClient; 045import org.jdom.Element; 046import org.jdom.Namespace; 047 048import java.util.Date; 049import java.util.List; 050import java.util.Map; 051import java.util.Set; 052import java.util.HashSet; 053 054public abstract class SubmitHttpXCommand extends WorkflowXCommand<String> { 055 056 protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>(); 057 protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>(); 058 059 static { 060 MANDATORY_OOZIE_CONFS.add(XOozieClient.JT); 061 MANDATORY_OOZIE_CONFS.add(XOozieClient.NN); 062 MANDATORY_OOZIE_CONFS.add(OozieClient.LIBPATH); 063 064 OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES); 065 OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES); 066 } 067 068 private Configuration conf; 069 070 public SubmitHttpXCommand(String name, String type, Configuration conf) { 071 super(name, type, 1); 072 this.conf = ParamChecker.notNull(conf, "conf"); 073 } 074 075 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 076 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 077 078 static { 079 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 080 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 081 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 082 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 083 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 084 085 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 086 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 087 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 088 } 089 090 abstract protected Element generateSection(Configuration conf, Namespace ns); 091 092 abstract protected Namespace getSectionNamespace(); 093 094 abstract protected String getWorkflowName(); 095 096 protected void checkMandatoryConf(Configuration conf) { 097 for (String key : MANDATORY_OOZIE_CONFS) { 098 String value = conf.get(key); 099 if (value == null) { 100 throw new RuntimeException(key + " is not specified"); 101 } 102 } 103 } 104 105 protected Namespace getWorkflowNamespace() { 106 return Namespace.getNamespace("uri:oozie:workflow:0.2"); 107 } 108 /** 109 * Generate workflow xml from conf object 110 * 111 * @param conf the configuration object 112 * @return workflow xml def string representation 113 */ 114 protected String getWorkflowXml(Configuration conf) { 115 checkMandatoryConf(conf); 116 117 Namespace ns = getWorkflowNamespace(); 118 Element root = new Element("workflow-app", ns); 119 String name = getWorkflowName(); 120 root.setAttribute("name", "oozie-" + name); 121 122 Element start = new Element("start", ns); 123 String nodeName = name + "1"; 124 start.setAttribute("to", nodeName); 125 root.addContent(start); 126 127 Element action = new Element("action", ns); 128 action.setAttribute("name", nodeName); 129 130 Element ele = generateSection(conf, getSectionNamespace()); 131 action.addContent(ele); 132 133 Element ok = new Element("ok", ns); 134 ok.setAttribute("to", "end"); 135 action.addContent(ok); 136 137 Element error = new Element("error", ns); 138 error.setAttribute("to", "fail"); 139 action.addContent(error); 140 141 root.addContent(action); 142 143 Element kill = new Element("kill", ns); 144 kill.setAttribute("name", "fail"); 145 Element message = new Element("message", ns); 146 message.addContent(name + " failed, error message[${wf:errorMessage(wf:lastErrorNode())}]"); 147 kill.addContent(message); 148 root.addContent(kill); 149 150 Element end = new Element("end", ns); 151 end.setAttribute("name", "end"); 152 root.addContent(end); 153 154 return XmlUtils.prettyPrint(root).toString(); 155 }; 156 157 protected Element generateConfigurationSection(List<String> Dargs, Namespace ns) { 158 Element configuration = new Element("configuration", ns); 159 for (String arg : Dargs) { 160 String name = null, value = null; 161 int pos = arg.indexOf("="); 162 if (pos == -1) { // "-D<name>" or "-D" only 163 name = arg.substring(2, arg.length()); 164 value = ""; 165 } 166 else { // "-D<name>=<value>" 167 name = arg.substring(2, pos); 168 value = arg.substring(pos + 1, arg.length()); 169 } 170 171 Element property = new Element("property", ns); 172 Element nameElement = new Element("name", ns); 173 nameElement.addContent(name); 174 property.addContent(nameElement); 175 Element valueElement = new Element("value", ns); 176 valueElement.addContent(value); 177 property.addContent(valueElement); 178 configuration.addContent(property); 179 } 180 181 return configuration; 182 } 183 184 /* (non-Javadoc) 185 * @see org.apache.oozie.command.XCommand#execute() 186 */ 187 @Override 188 protected String execute() throws CommandException { 189 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 190 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 191 try { 192 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 193 String wfXml = getWorkflowXml(conf); 194 LOG.debug("workflow xml created on the server side is :\n"); 195 LOG.debug(wfXml); 196 WorkflowApp app = wps.parseDef(wfXml, conf); 197 XConfiguration protoActionConf = wps.createProtoActionConf(conf, false); 198 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 199 200 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 201 202 // Resolving all variables in the job properties. 203 // This ensures the Hadoop Configuration semantics is preserved. 204 XConfiguration resolvedVarsConf = new XConfiguration(); 205 for (Map.Entry<String, String> entry : conf) { 206 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 207 } 208 conf = resolvedVarsConf; 209 210 WorkflowInstance wfInstance; 211 try { 212 wfInstance = workflowLib.createInstance(app, conf); 213 } 214 catch (WorkflowException e) { 215 throw new StoreException(e); 216 } 217 218 Configuration conf = wfInstance.getConf(); 219 220 WorkflowJobBean workflow = new WorkflowJobBean(); 221 workflow.setId(wfInstance.getId()); 222 workflow.setAppName(app.getName()); 223 workflow.setAppPath(conf.get(OozieClient.APP_PATH)); 224 workflow.setConf(XmlUtils.prettyPrint(conf).toString()); 225 workflow.setProtoActionConf(protoActionConf.toXmlString()); 226 workflow.setCreatedTime(new Date()); 227 workflow.setLastModifiedTime(new Date()); 228 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 229 workflow.setStatus(WorkflowJob.Status.PREP); 230 workflow.setRun(0); 231 workflow.setUser(conf.get(OozieClient.USER_NAME)); 232 workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); 233 workflow.setWorkflowInstance(wfInstance); 234 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 235 236 LogUtils.setLogInfo(workflow, logInfo); 237 JPAService jpaService = Services.get().get(JPAService.class); 238 if (jpaService != null) { 239 jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow)); 240 } 241 else { 242 LOG.error(ErrorCode.E0610); 243 return null; 244 } 245 246 return workflow.getId(); 247 } 248 catch (WorkflowException ex) { 249 throw new CommandException(ex); 250 } 251 catch (Exception ex) { 252 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 253 } 254 } 255 256 static private void addSection(Element X, Namespace ns, String filesStr, String tagName) { 257 if (filesStr != null) { 258 String[] files = filesStr.split(","); 259 for (String f : files) { 260 Element tagElement = new Element(tagName, ns); 261 if (f.contains("#")) { 262 tagElement.addContent(f); 263 } 264 else { 265 String filename = f.substring(f.lastIndexOf("/") + 1, f.length()); 266 if (filename == null || filename.isEmpty()) { 267 tagElement.addContent(f); 268 } 269 else { 270 tagElement.addContent(f + "#" + filename); 271 } 272 } 273 X.addContent(tagElement); 274 } 275 } 276 } 277 278 /** 279 * Add file section in X. 280 * 281 * @param parent XML element to be appended 282 * @param conf Configuration object 283 * @param ns XML element namespace 284 */ 285 static void addFileSection(Element X, Configuration conf, Namespace ns) { 286 String filesStr = conf.get(XOozieClient.FILES); 287 addSection(X, ns, filesStr, "file"); 288 } 289 290 /** 291 * Add archive section in X. 292 * 293 * @param parent XML element to be appended 294 * @param conf Configuration object 295 * @param ns XML element namespace 296 */ 297 static void addArchiveSection(Element X, Configuration conf, Namespace ns) { 298 String archivesStr = conf.get(XOozieClient.ARCHIVES); 299 addSection(X, ns, archivesStr, "archive"); 300 } 301}