This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.service.WorkflowAppService;
022 import org.apache.oozie.util.XmlUtils;
023 import org.jdom.Element;
024 import org.jdom.Namespace;
025 import org.apache.oozie.client.XOozieClient;
026
027 import java.util.HashSet;
028 import java.util.Iterator;
029 import java.util.Map;
030 import java.util.Set;
031
032 public class SubmitMRCommand extends SubmitHttpCommand {
033 private static final Set<String> SKIPPED_CONFS = new HashSet<String>();
034
035 public SubmitMRCommand(Configuration conf, String authToken) {
036 super("submitMR", "submitMR", conf, authToken);
037 }
038
039 static {
040 SKIPPED_CONFS.add(WorkflowAppService.HADOOP_USER);
041 SKIPPED_CONFS.add(WorkflowAppService.HADOOP_UGI);
042 SKIPPED_CONFS.add(XOozieClient.JT);
043 SKIPPED_CONFS.add(XOozieClient.NN);
044 SKIPPED_CONFS.add(WorkflowAppService.HADOOP_JT_KERBEROS_NAME);
045 SKIPPED_CONFS.add(WorkflowAppService.HADOOP_NN_KERBEROS_NAME);
046 }
047
048 private Element generateConfigurationSection(Configuration conf, Namespace ns) {
049 Element configuration = null;
050 Iterator<Map.Entry<String, String>> iter = conf.iterator();
051 while (iter.hasNext()) {
052 Map.Entry<String, String> entry = iter.next();
053 String name = entry.getKey();
054 if (MANDATORY_OOZIE_CONFS.contains(name) || OPTIONAL_OOZIE_CONFS.contains(name)
055 || SKIPPED_CONFS.contains(name)) {
056 continue;
057 }
058
059 if (configuration == null) {
060 configuration = new Element("configuration", ns);
061 }
062
063 String value = entry.getValue();
064 Element property = new Element("property", ns);
065 Element nameElement = new Element("name", ns);
066 nameElement.addContent(name != null ? name : "");
067 property.addContent(nameElement);
068 Element valueElement = new Element("value", ns);
069 valueElement.addContent(value != null ? value : "");
070 property.addContent(valueElement);
071 configuration.addContent(property);
072 }
073
074 return configuration;
075 }
076
077 private Element generateMRSection(Configuration conf, Namespace ns) {
078 Element mapreduce = new Element("map-reduce", ns);
079 Element jt = new Element("job-tracker", ns);
080 jt.addContent(conf.get(XOozieClient.JT));
081 mapreduce.addContent(jt);
082 Element nn = new Element("name-node", ns);
083 nn.addContent(conf.get(XOozieClient.NN));
084 mapreduce.addContent(nn);
085
086 if (conf.size() > MANDATORY_OOZIE_CONFS.size()) { // excluding JT, NN,
087 // LIBPATH
088 // configuration section
089 Element configuration = generateConfigurationSection(conf, ns);
090 if (configuration != null) {
091 mapreduce.addContent(configuration);
092 }
093
094 // file section
095 addFileSection(mapreduce, conf, ns);
096
097 // archive section
098 addArchiveSection(mapreduce, conf, ns);
099 }
100
101 return mapreduce;
102 }
103
104 /*
105 * (non-Javadoc)
106 *
107 * @see
108 * org.apache.oozie.command.wf.SubmitHttpCommand#getWorkflowXml(org.apache
109 * .hadoop.conf.Configuration)
110 */
111 @Override
112 protected String getWorkflowXml(Configuration conf) {
113 for (String key : MANDATORY_OOZIE_CONFS) {
114 String value = conf.get(key);
115 if (value == null) {
116 throw new RuntimeException(key + " is not specified");
117 }
118 }
119
120 Namespace ns = Namespace.getNamespace("uri:oozie:workflow:0.2");
121 Element root = new Element("workflow-app", ns);
122 root.setAttribute("name", "oozie-mapreduce");
123
124 Element start = new Element("start", ns);
125 start.setAttribute("to", "hadoop1");
126 root.addContent(start);
127
128 Element action = new Element("action", ns);
129 action.setAttribute("name", "hadoop1");
130
131 Element mapreduce = generateMRSection(conf, ns);
132 action.addContent(mapreduce);
133
134 Element ok = new Element("ok", ns);
135 ok.setAttribute("to", "end");
136 action.addContent(ok);
137
138 Element error = new Element("error", ns);
139 error.setAttribute("to", "fail");
140 action.addContent(error);
141
142 root.addContent(action);
143
144 Element kill = new Element("kill", ns);
145 kill.setAttribute("name", "fail");
146 Element message = new Element("message", ns);
147 message.addContent("Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]");
148 kill.addContent(message);
149 root.addContent(kill);
150
151 Element end = new Element("end", ns);
152 end.setAttribute("name", "end");
153 root.addContent(end);
154
155 return XmlUtils.prettyPrint(root).toString();
156 }
157 }