This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.service.WorkflowAppService;
022 import org.apache.oozie.util.XmlUtils;
023 import org.jdom.Element;
024 import org.jdom.Namespace;
025 import org.apache.oozie.client.XOozieClient;
026 import org.apache.oozie.command.CommandException;
027
028 import java.util.HashMap;
029 import java.util.HashSet;
030 import java.util.Iterator;
031 import java.util.Map;
032 import java.util.Set;
033
034 public class SubmitMRXCommand extends SubmitHttpXCommand {
035 private static final Set<String> SKIPPED_CONFS = new HashSet<String>();
036 private static final Map<String, String> DEPRECATE_MAP = new HashMap<String, String>();
037
038 public SubmitMRXCommand(Configuration conf) {
039 super("submitMR", "submitMR", conf);
040 }
041
042 static {
043 SKIPPED_CONFS.add(WorkflowAppService.HADOOP_USER);
044 SKIPPED_CONFS.add(XOozieClient.JT);
045 SKIPPED_CONFS.add(XOozieClient.NN);
046 // a brillant mind made a change in Configuration that 'fs.default.name' key gets converted to 'fs.defaultFS'
047 // in Hadoop 0.23, we need skip that one too, keeping the old one because of Hadoop 1
048 SKIPPED_CONFS.add(XOozieClient.NN_2);
049
050 DEPRECATE_MAP.put(XOozieClient.NN, XOozieClient.NN_2);
051 DEPRECATE_MAP.put(XOozieClient.JT, XOozieClient.JT_2);
052 DEPRECATE_MAP.put(WorkflowAppService.HADOOP_USER, "mapreduce.job.user.name");
053 }
054
055 private Element generateConfigurationSection(Configuration conf, Namespace ns) {
056 Element configuration = null;
057 Iterator<Map.Entry<String, String>> iter = conf.iterator();
058 while (iter.hasNext()) {
059 Map.Entry<String, String> entry = iter.next();
060 String name = entry.getKey();
061 if (MANDATORY_OOZIE_CONFS.contains(name) || OPTIONAL_OOZIE_CONFS.contains(name)
062 || SKIPPED_CONFS.contains(name)
063 || DEPRECATE_MAP.containsValue(name)) {
064 continue;
065 }
066
067 if (configuration == null) {
068 configuration = new Element("configuration", ns);
069 }
070
071 String value = entry.getValue();
072 Element property = new Element("property", ns);
073 Element nameElement = new Element("name", ns);
074 nameElement.addContent(name != null ? name : "");
075 property.addContent(nameElement);
076 Element valueElement = new Element("value", ns);
077 valueElement.addContent(value != null ? value : "");
078 property.addContent(valueElement);
079 configuration.addContent(property);
080 }
081
082 return configuration;
083 }
084
085 private Element generateMRSection(Configuration conf, Namespace ns) {
086 Element mapreduce = new Element("map-reduce", ns);
087 Element jt = new Element("job-tracker", ns);
088 String newJTVal = conf.get(DEPRECATE_MAP.get(XOozieClient.JT));
089 jt.addContent(newJTVal != null ? newJTVal : (conf.get(XOozieClient.JT)));
090 mapreduce.addContent(jt);
091 Element nn = new Element("name-node", ns);
092 String newNNVal = conf.get(DEPRECATE_MAP.get(XOozieClient.NN));
093 nn.addContent(newNNVal != null ? newNNVal : (conf.get(XOozieClient.NN)));
094 mapreduce.addContent(nn);
095
096 if (conf.size() > MANDATORY_OOZIE_CONFS.size()) { // excluding JT, NN,
097 // LIBPATH
098 // configuration section
099 Element configuration = generateConfigurationSection(conf, ns);
100 if (configuration != null) {
101 mapreduce.addContent(configuration);
102 }
103
104 // file section
105 addFileSection(mapreduce, conf, ns);
106
107 // archive section
108 addArchiveSection(mapreduce, conf, ns);
109 }
110
111 return mapreduce;
112 }
113
114 /*
115 * (non-Javadoc)
116 *
117 * @see
118 * org.apache.oozie.command.wf.SubmitHttpCommand#getWorkflowXml(org.apache
119 * .hadoop.conf.Configuration)
120 */
121 @Override
122 protected String getWorkflowXml(Configuration conf) {
123 for (String key : MANDATORY_OOZIE_CONFS) {
124 String value = conf.get(key);
125 if(value == null) {
126 if(DEPRECATE_MAP.containsKey(key)) {
127 if(conf.get(DEPRECATE_MAP.get(key)) == null) {
128 throw new RuntimeException(key + " or " + DEPRECATE_MAP.get(key) + " is not specified");
129 }
130 }
131 else {
132 throw new RuntimeException(key + " is not specified");
133 }
134 }
135 }
136
137 Namespace ns = Namespace.getNamespace("uri:oozie:workflow:0.2");
138 Element root = new Element("workflow-app", ns);
139 root.setAttribute("name", "oozie-mapreduce");
140
141 Element start = new Element("start", ns);
142 start.setAttribute("to", "hadoop1");
143 root.addContent(start);
144
145 Element action = new Element("action", ns);
146 action.setAttribute("name", "hadoop1");
147
148 Element mapreduce = generateMRSection(conf, ns);
149 action.addContent(mapreduce);
150
151 Element ok = new Element("ok", ns);
152 ok.setAttribute("to", "end");
153 action.addContent(ok);
154
155 Element error = new Element("error", ns);
156 error.setAttribute("to", "fail");
157 action.addContent(error);
158
159 root.addContent(action);
160
161 Element kill = new Element("kill", ns);
162 kill.setAttribute("name", "fail");
163 Element message = new Element("message", ns);
164 message.addContent("Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]");
165 kill.addContent(message);
166 root.addContent(kill);
167
168 Element end = new Element("end", ns);
169 end.setAttribute("name", "end");
170 root.addContent(end);
171
172 return XmlUtils.prettyPrint(root).toString();
173 }
174
175 @Override
176 public String getEntityKey() {
177 return null;
178 }
179
180 @Override
181 protected boolean isLockRequired() {
182 return false;
183 }
184
185 @Override
186 protected void loadState() {
187
188 }
189
190 @Override
191 protected void verifyPrecondition() throws CommandException {
192
193 }
194 }