001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.WorkflowJobBean; 022 import org.apache.oozie.ErrorCode; 023 import org.apache.oozie.service.WorkflowStoreService; 024 import org.apache.oozie.service.WorkflowAppService; 025 import org.apache.oozie.service.Services; 026 import org.apache.oozie.service.DagXLogInfoService; 027 import org.apache.oozie.util.XLog; 028 import org.apache.oozie.util.ParamChecker; 029 import org.apache.oozie.util.XConfiguration; 030 import org.apache.oozie.util.XmlUtils; 031 import org.apache.oozie.command.CommandException; 032 import org.apache.oozie.store.StoreException; 033 import org.apache.oozie.store.WorkflowStore; 034 import org.apache.oozie.workflow.WorkflowApp; 035 import org.apache.oozie.workflow.WorkflowException; 036 import org.apache.oozie.workflow.WorkflowInstance; 037 import org.apache.oozie.workflow.WorkflowLib; 038 import org.apache.oozie.util.PropertiesUtils; 039 import org.apache.oozie.client.OozieClient; 040 import org.apache.oozie.client.WorkflowJob; 041 import org.apache.oozie.client.XOozieClient; 042 import org.jdom.Element; 043 import org.jdom.Namespace; 044 045 import java.util.Date; 046 import java.util.Map; 047 import java.util.Set; 048 import java.util.HashSet; 049 050 public abstract class SubmitHttpCommand extends WorkflowCommand<String> { 051 public static final String USE_SYSTEM_LIBPATH_FOR_MR_PIG_JOBS ="use.system.libpath.for.mapreduce.and.pig.jobs"; 052 053 protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>(); 054 protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>(); 055 056 static { 057 MANDATORY_OOZIE_CONFS.add(XOozieClient.JT); 058 MANDATORY_OOZIE_CONFS.add(XOozieClient.NN); 059 060 OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES); 061 OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES); 062 } 063 064 private Configuration conf; 065 private String authToken; 066 067 public SubmitHttpCommand(String name, String type, Configuration conf, String authToken) { 068 super(name, type, 1, XLog.STD); 069 this.conf = ParamChecker.notNull(conf, "conf"); 070 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 071 } 072 073 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 074 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 075 076 static { 077 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 078 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 079 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 080 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 081 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 082 083 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI, 084 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME }; 085 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 086 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 087 } 088 089 090 /** 091 * Generate workflow xml from conf object 092 * 093 * @param conf the configuration object 094 * @return workflow xml def string representation 095 */ 096 abstract protected String getWorkflowXml(Configuration conf); 097 098 @Override 099 protected String call(WorkflowStore store) throws StoreException, CommandException { 100 incrJobCounter(1); 101 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 102 try { 103 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 104 String wfXml = getWorkflowXml(conf); 105 XLog.getLog(getClass()).debug("workflow xml created on the server side is :\n"); 106 XLog.getLog(getClass()).debug(wfXml); 107 WorkflowApp app = wps.parseDef(wfXml); 108 109 if (conf.get(OozieClient.USE_SYSTEM_LIBPATH) == null) { 110 if (Services.get().getConf().getBoolean(USE_SYSTEM_LIBPATH_FOR_MR_PIG_JOBS, false)) { 111 conf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, true); 112 } 113 } 114 115 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, false); 116 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 117 118 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 119 120 // Resolving all variables in the job properties. 121 // This ensures the Hadoop Configuration semantics is preserved. 122 XConfiguration resolvedVarsConf = new XConfiguration(); 123 for (Map.Entry<String, String> entry : conf) { 124 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 125 } 126 conf = resolvedVarsConf; 127 128 WorkflowInstance wfInstance; 129 try { 130 wfInstance = workflowLib.createInstance(app, conf); 131 } 132 catch (WorkflowException e) { 133 throw new StoreException(e); 134 } 135 136 Configuration conf = wfInstance.getConf(); 137 138 WorkflowJobBean workflow = new WorkflowJobBean(); 139 workflow.setId(wfInstance.getId()); 140 workflow.setAppName(app.getName()); 141 workflow.setAppPath(conf.get(OozieClient.APP_PATH)); 142 workflow.setConf(XmlUtils.prettyPrint(conf).toString()); 143 workflow.setProtoActionConf(protoActionConf.toXmlString()); 144 workflow.setCreatedTime(new Date()); 145 workflow.setLastModifiedTime(new Date()); 146 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 147 workflow.setStatus(WorkflowJob.Status.PREP); 148 workflow.setRun(0); 149 workflow.setUser(conf.get(OozieClient.USER_NAME)); 150 workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); 151 workflow.setAuthToken(authToken); 152 workflow.setWorkflowInstance(wfInstance); 153 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 154 155 setLogInfo(workflow); 156 store.insertWorkflow(workflow); 157 158 return workflow.getId(); 159 } 160 catch (WorkflowException ex) { 161 throw new CommandException(ex); 162 } 163 catch (Exception ex) { 164 throw new CommandException(ErrorCode.E0803, ex); 165 } 166 } 167 168 static private void addSection(Element X, Namespace ns, String filesStr, String tagName) { 169 if (filesStr != null) { 170 String[] files = filesStr.split(","); 171 for (String f : files) { 172 Element tagElement = new Element(tagName, ns); 173 if (f.contains("#")) { 174 tagElement.addContent(f); 175 } 176 else { 177 String filename = f.substring(f.lastIndexOf("/") + 1, f.length()); 178 if (filename == null || filename.isEmpty()) { 179 tagElement.addContent(f); 180 } 181 else { 182 tagElement.addContent(f + "#" + filename); 183 } 184 } 185 X.addContent(tagElement); 186 } 187 } 188 } 189 190 /** 191 * Add file section in X. 192 * 193 * @param parent XML element to be appended 194 * @param conf Configuration object 195 * @param ns XML element namespace 196 */ 197 static void addFileSection(Element X, Configuration conf, Namespace ns) { 198 String filesStr = conf.get(XOozieClient.FILES); 199 addSection(X, ns, filesStr, "file"); 200 } 201 202 /** 203 * Add archive section in X. 204 * 205 * @param parent XML element to be appended 206 * @param conf Configuration object 207 * @param ns XML element namespace 208 */ 209 static void addArchiveSection(Element X, Configuration conf, Namespace ns) { 210 String archivesStr = conf.get(XOozieClient.ARCHIVES); 211 addSection(X, ns, archivesStr, "archive"); 212 } 213 }