001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.service.WorkflowAppService; 022 import org.apache.oozie.util.XmlUtils; 023 import org.jdom.Element; 024 import org.jdom.Namespace; 025 import org.apache.oozie.client.XOozieClient; 026 027 import java.util.HashSet; 028 import java.util.Iterator; 029 import java.util.Map; 030 import java.util.Set; 031 032 public class SubmitMRCommand extends SubmitHttpCommand { 033 private static final Set<String> SKIPPED_CONFS = new HashSet<String>(); 034 035 public SubmitMRCommand(Configuration conf, String authToken) { 036 super("submitMR", "submitMR", conf, authToken); 037 } 038 039 static { 040 SKIPPED_CONFS.add(WorkflowAppService.HADOOP_USER); 041 SKIPPED_CONFS.add(WorkflowAppService.HADOOP_UGI); 042 SKIPPED_CONFS.add(XOozieClient.JT); 043 SKIPPED_CONFS.add(XOozieClient.NN); 044 SKIPPED_CONFS.add(WorkflowAppService.HADOOP_JT_KERBEROS_NAME); 045 SKIPPED_CONFS.add(WorkflowAppService.HADOOP_NN_KERBEROS_NAME); 046 } 047 048 private Element generateConfigurationSection(Configuration conf, Namespace ns) { 049 Element configuration = null; 050 Iterator<Map.Entry<String, String>> iter = conf.iterator(); 051 while (iter.hasNext()) { 052 Map.Entry<String, String> entry = iter.next(); 053 String name = entry.getKey(); 054 if (MANDATORY_OOZIE_CONFS.contains(name) || OPTIONAL_OOZIE_CONFS.contains(name) 055 || SKIPPED_CONFS.contains(name)) { 056 continue; 057 } 058 059 if (configuration == null) { 060 configuration = new Element("configuration", ns); 061 } 062 063 String value = entry.getValue(); 064 Element property = new Element("property", ns); 065 Element nameElement = new Element("name", ns); 066 nameElement.addContent(name != null ? name : ""); 067 property.addContent(nameElement); 068 Element valueElement = new Element("value", ns); 069 valueElement.addContent(value != null ? value : ""); 070 property.addContent(valueElement); 071 configuration.addContent(property); 072 } 073 074 return configuration; 075 } 076 077 private Element generateMRSection(Configuration conf, Namespace ns) { 078 Element mapreduce = new Element("map-reduce", ns); 079 Element jt = new Element("job-tracker", ns); 080 jt.addContent(conf.get(XOozieClient.JT)); 081 mapreduce.addContent(jt); 082 Element nn = new Element("name-node", ns); 083 nn.addContent(conf.get(XOozieClient.NN)); 084 mapreduce.addContent(nn); 085 086 if (conf.size() > MANDATORY_OOZIE_CONFS.size()) { // excluding JT, NN, 087 // LIBPATH 088 // configuration section 089 Element configuration = generateConfigurationSection(conf, ns); 090 if (configuration != null) { 091 mapreduce.addContent(configuration); 092 } 093 094 // file section 095 addFileSection(mapreduce, conf, ns); 096 097 // archive section 098 addArchiveSection(mapreduce, conf, ns); 099 } 100 101 return mapreduce; 102 } 103 104 /* 105 * (non-Javadoc) 106 * 107 * @see 108 * org.apache.oozie.command.wf.SubmitHttpCommand#getWorkflowXml(org.apache 109 * .hadoop.conf.Configuration) 110 */ 111 @Override 112 protected String getWorkflowXml(Configuration conf) { 113 for (String key : MANDATORY_OOZIE_CONFS) { 114 String value = conf.get(key); 115 if (value == null) { 116 throw new RuntimeException(key + " is not specified"); 117 } 118 } 119 120 Namespace ns = Namespace.getNamespace("uri:oozie:workflow:0.2"); 121 Element root = new Element("workflow-app", ns); 122 root.setAttribute("name", "oozie-mapreduce"); 123 124 Element start = new Element("start", ns); 125 start.setAttribute("to", "hadoop1"); 126 root.addContent(start); 127 128 Element action = new Element("action", ns); 129 action.setAttribute("name", "hadoop1"); 130 131 Element mapreduce = generateMRSection(conf, ns); 132 action.addContent(mapreduce); 133 134 Element ok = new Element("ok", ns); 135 ok.setAttribute("to", "end"); 136 action.addContent(ok); 137 138 Element error = new Element("error", ns); 139 error.setAttribute("to", "fail"); 140 action.addContent(error); 141 142 root.addContent(action); 143 144 Element kill = new Element("kill", ns); 145 kill.setAttribute("name", "fail"); 146 Element message = new Element("message", ns); 147 message.addContent("Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]"); 148 kill.addContent(message); 149 root.addContent(kill); 150 151 Element end = new Element("end", ns); 152 end.setAttribute("name", "end"); 153 root.addContent(end); 154 155 return XmlUtils.prettyPrint(root).toString(); 156 } 157 }