This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.service.WorkflowAppService;
022 import org.apache.oozie.util.XmlUtils;
023 import org.jdom.Element;
024 import org.jdom.Namespace;
025 import org.apache.oozie.client.XOozieClient;
026 import org.apache.oozie.command.CommandException;
027
028 import java.util.HashSet;
029 import java.util.Iterator;
030 import java.util.Map;
031 import java.util.Set;
032
033 public class SubmitMRXCommand extends SubmitHttpXCommand {
034 private static final Set<String> SKIPPED_CONFS = new HashSet<String>();
035
036 public SubmitMRXCommand(Configuration conf, String authToken) {
037 super("submitMR", "submitMR", conf, authToken);
038 }
039
040 static {
041 SKIPPED_CONFS.add(WorkflowAppService.HADOOP_USER);
042 SKIPPED_CONFS.add(XOozieClient.JT);
043 SKIPPED_CONFS.add(XOozieClient.NN);
044 // a brillant mind made a change in Configuration that 'fs.default.name' key gets converted to 'fs.defaultFS'
045 // in Hadoop 0.23, we need skip that one too, keeping the old one because of Hadoop 1
046 SKIPPED_CONFS.add("fs.defaultFS");
047 }
048
049 private Element generateConfigurationSection(Configuration conf, Namespace ns) {
050 Element configuration = null;
051 Iterator<Map.Entry<String, String>> iter = conf.iterator();
052 while (iter.hasNext()) {
053 Map.Entry<String, String> entry = iter.next();
054 String name = entry.getKey();
055 if (MANDATORY_OOZIE_CONFS.contains(name) || OPTIONAL_OOZIE_CONFS.contains(name)
056 || SKIPPED_CONFS.contains(name)) {
057 continue;
058 }
059
060 if (configuration == null) {
061 configuration = new Element("configuration", ns);
062 }
063
064 String value = entry.getValue();
065 Element property = new Element("property", ns);
066 Element nameElement = new Element("name", ns);
067 nameElement.addContent(name != null ? name : "");
068 property.addContent(nameElement);
069 Element valueElement = new Element("value", ns);
070 valueElement.addContent(value != null ? value : "");
071 property.addContent(valueElement);
072 configuration.addContent(property);
073 }
074
075 return configuration;
076 }
077
078 private Element generateMRSection(Configuration conf, Namespace ns) {
079 Element mapreduce = new Element("map-reduce", ns);
080 Element jt = new Element("job-tracker", ns);
081 jt.addContent(conf.get(XOozieClient.JT));
082 mapreduce.addContent(jt);
083 Element nn = new Element("name-node", ns);
084 nn.addContent(conf.get(XOozieClient.NN));
085 mapreduce.addContent(nn);
086
087 if (conf.size() > MANDATORY_OOZIE_CONFS.size()) { // excluding JT, NN,
088 // LIBPATH
089 // configuration section
090 Element configuration = generateConfigurationSection(conf, ns);
091 if (configuration != null) {
092 mapreduce.addContent(configuration);
093 }
094
095 // file section
096 addFileSection(mapreduce, conf, ns);
097
098 // archive section
099 addArchiveSection(mapreduce, conf, ns);
100 }
101
102 return mapreduce;
103 }
104
105 /*
106 * (non-Javadoc)
107 *
108 * @see
109 * org.apache.oozie.command.wf.SubmitHttpCommand#getWorkflowXml(org.apache
110 * .hadoop.conf.Configuration)
111 */
112 @Override
113 protected String getWorkflowXml(Configuration conf) {
114 for (String key : MANDATORY_OOZIE_CONFS) {
115 String value = conf.get(key);
116 if (value == null) {
117 throw new RuntimeException(key + " is not specified");
118 }
119 }
120
121 Namespace ns = Namespace.getNamespace("uri:oozie:workflow:0.2");
122 Element root = new Element("workflow-app", ns);
123 root.setAttribute("name", "oozie-mapreduce");
124
125 Element start = new Element("start", ns);
126 start.setAttribute("to", "hadoop1");
127 root.addContent(start);
128
129 Element action = new Element("action", ns);
130 action.setAttribute("name", "hadoop1");
131
132 Element mapreduce = generateMRSection(conf, ns);
133 action.addContent(mapreduce);
134
135 Element ok = new Element("ok", ns);
136 ok.setAttribute("to", "end");
137 action.addContent(ok);
138
139 Element error = new Element("error", ns);
140 error.setAttribute("to", "fail");
141 action.addContent(error);
142
143 root.addContent(action);
144
145 Element kill = new Element("kill", ns);
146 kill.setAttribute("name", "fail");
147 Element message = new Element("message", ns);
148 message.addContent("Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]");
149 kill.addContent(message);
150 root.addContent(kill);
151
152 Element end = new Element("end", ns);
153 end.setAttribute("name", "end");
154 root.addContent(end);
155
156 return XmlUtils.prettyPrint(root).toString();
157 }
158
159 @Override
160 public String getEntityKey() {
161 return null;
162 }
163
164 @Override
165 protected boolean isLockRequired() {
166 return false;
167 }
168
169 @Override
170 protected void loadState() {
171
172 }
173
174 @Override
175 protected void verifyPrecondition() throws CommandException {
176
177 }
178 }