001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.WorkflowJobBean; 022 import org.apache.oozie.ErrorCode; 023 import org.apache.oozie.service.JPAService; 024 import org.apache.oozie.service.WorkflowStoreService; 025 import org.apache.oozie.service.WorkflowAppService; 026 import org.apache.oozie.service.Services; 027 import org.apache.oozie.service.DagXLogInfoService; 028 import org.apache.oozie.util.InstrumentUtils; 029 import org.apache.oozie.util.LogUtils; 030 import org.apache.oozie.util.XLog; 031 import org.apache.oozie.util.ParamChecker; 032 import org.apache.oozie.util.XConfiguration; 033 import org.apache.oozie.util.XmlUtils; 034 import org.apache.oozie.command.CommandException; 035 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; 036 import org.apache.oozie.store.StoreException; 037 import org.apache.oozie.workflow.WorkflowApp; 038 import org.apache.oozie.workflow.WorkflowException; 039 import org.apache.oozie.workflow.WorkflowInstance; 040 import org.apache.oozie.workflow.WorkflowLib; 041 import org.apache.oozie.util.PropertiesUtils; 042 import org.apache.oozie.client.OozieClient; 043 import org.apache.oozie.client.WorkflowJob; 044 import org.apache.oozie.client.XOozieClient; 045 import org.jdom.Element; 046 import org.jdom.Namespace; 047 048 import java.util.Date; 049 import java.util.Map; 050 import java.util.Set; 051 import java.util.HashSet; 052 053 public abstract class SubmitHttpXCommand extends WorkflowXCommand<String> { 054 055 protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>(); 056 protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>(); 057 058 static { 059 MANDATORY_OOZIE_CONFS.add(XOozieClient.JT); 060 MANDATORY_OOZIE_CONFS.add(XOozieClient.NN); 061 MANDATORY_OOZIE_CONFS.add(OozieClient.LIBPATH); 062 063 OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES); 064 OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES); 065 } 066 067 private Configuration conf; 068 private String authToken; 069 070 public SubmitHttpXCommand(String name, String type, Configuration conf, String authToken) { 071 super(name, type, 1); 072 this.conf = ParamChecker.notNull(conf, "conf"); 073 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 074 } 075 076 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 077 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 078 079 static { 080 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 081 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 082 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 083 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 084 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 085 086 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI, 087 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME }; 088 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 089 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 090 } 091 092 /** 093 * Generate workflow xml from conf object 094 * 095 * @param conf the configuration object 096 * @return workflow xml def string representation 097 */ 098 abstract protected String getWorkflowXml(Configuration conf); 099 100 /* (non-Javadoc) 101 * @see org.apache.oozie.command.XCommand#execute() 102 */ 103 @Override 104 protected String execute() throws CommandException { 105 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 106 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 107 try { 108 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 109 String wfXml = getWorkflowXml(conf); 110 LOG.debug("workflow xml created on the server side is :\n"); 111 LOG.debug(wfXml); 112 WorkflowApp app = wps.parseDef(wfXml); 113 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, false); 114 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 115 116 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 117 118 // Resolving all variables in the job properties. 119 // This ensures the Hadoop Configuration semantics is preserved. 120 XConfiguration resolvedVarsConf = new XConfiguration(); 121 for (Map.Entry<String, String> entry : conf) { 122 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 123 } 124 conf = resolvedVarsConf; 125 126 WorkflowInstance wfInstance; 127 try { 128 wfInstance = workflowLib.createInstance(app, conf); 129 } 130 catch (WorkflowException e) { 131 throw new StoreException(e); 132 } 133 134 Configuration conf = wfInstance.getConf(); 135 136 WorkflowJobBean workflow = new WorkflowJobBean(); 137 workflow.setId(wfInstance.getId()); 138 workflow.setAppName(app.getName()); 139 workflow.setAppPath(conf.get(OozieClient.APP_PATH)); 140 workflow.setConf(XmlUtils.prettyPrint(conf).toString()); 141 workflow.setProtoActionConf(protoActionConf.toXmlString()); 142 workflow.setCreatedTime(new Date()); 143 workflow.setLastModifiedTime(new Date()); 144 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 145 workflow.setStatus(WorkflowJob.Status.PREP); 146 workflow.setRun(0); 147 workflow.setUser(conf.get(OozieClient.USER_NAME)); 148 workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); 149 workflow.setAuthToken(authToken); 150 workflow.setWorkflowInstance(wfInstance); 151 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 152 153 LogUtils.setLogInfo(workflow, logInfo); 154 JPAService jpaService = Services.get().get(JPAService.class); 155 if (jpaService != null) { 156 jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow)); 157 } 158 else { 159 LOG.error(ErrorCode.E0610); 160 return null; 161 } 162 163 return workflow.getId(); 164 } 165 catch (WorkflowException ex) { 166 throw new CommandException(ex); 167 } 168 catch (Exception ex) { 169 throw new CommandException(ErrorCode.E0803, ex); 170 } 171 } 172 173 static private void addSection(Element X, Namespace ns, String filesStr, String tagName) { 174 if (filesStr != null) { 175 String[] files = filesStr.split(","); 176 for (String f : files) { 177 Element tagElement = new Element(tagName, ns); 178 if (f.contains("#")) { 179 tagElement.addContent(f); 180 } 181 else { 182 String filename = f.substring(f.lastIndexOf("/") + 1, f.length()); 183 if (filename == null || filename.isEmpty()) { 184 tagElement.addContent(f); 185 } 186 else { 187 tagElement.addContent(f + "#" + filename); 188 } 189 } 190 X.addContent(tagElement); 191 } 192 } 193 } 194 195 /** 196 * Add file section in X. 197 * 198 * @param parent XML element to be appended 199 * @param conf Configuration object 200 * @param ns XML element namespace 201 */ 202 static void addFileSection(Element X, Configuration conf, Namespace ns) { 203 String filesStr = conf.get(XOozieClient.FILES); 204 addSection(X, ns, filesStr, "file"); 205 } 206 207 /** 208 * Add archive section in X. 209 * 210 * @param parent XML element to be appended 211 * @param conf Configuration object 212 * @param ns XML element namespace 213 */ 214 static void addArchiveSection(Element X, Configuration conf, Namespace ns) { 215 String archivesStr = conf.get(XOozieClient.ARCHIVES); 216 addSection(X, ns, archivesStr, "archive"); 217 } 218 }