001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.service.WorkflowAppService;
022    import org.apache.oozie.util.XmlUtils;
023    import org.jdom.Element;
024    import org.jdom.Namespace;
025    import org.apache.oozie.client.XOozieClient;
026    
027    import java.util.HashSet;
028    import java.util.Iterator;
029    import java.util.Map;
030    import java.util.Set;
031    
032    public class SubmitMRCommand extends SubmitHttpCommand {
033        private static final Set<String> SKIPPED_CONFS = new HashSet<String>();
034    
035        public SubmitMRCommand(Configuration conf, String authToken) {
036            super("submitMR", "submitMR", conf, authToken);
037        }
038    
039        static {
040            SKIPPED_CONFS.add(WorkflowAppService.HADOOP_USER);
041            SKIPPED_CONFS.add(WorkflowAppService.HADOOP_UGI);
042            SKIPPED_CONFS.add(XOozieClient.JT);
043            SKIPPED_CONFS.add(XOozieClient.NN);
044            SKIPPED_CONFS.add(WorkflowAppService.HADOOP_JT_KERBEROS_NAME);
045            SKIPPED_CONFS.add(WorkflowAppService.HADOOP_NN_KERBEROS_NAME);
046        }
047    
048        private Element generateConfigurationSection(Configuration conf, Namespace ns) {
049            Element configuration = null;
050            Iterator<Map.Entry<String, String>> iter = conf.iterator();
051            while (iter.hasNext()) {
052                Map.Entry<String, String> entry = iter.next();
053                String name = entry.getKey();
054                if (MANDATORY_OOZIE_CONFS.contains(name) || OPTIONAL_OOZIE_CONFS.contains(name)
055                        || SKIPPED_CONFS.contains(name)) {
056                    continue;
057                }
058    
059                if (configuration == null) {
060                    configuration = new Element("configuration", ns);
061                }
062    
063                String value = entry.getValue();
064                Element property = new Element("property", ns);
065                Element nameElement = new Element("name", ns);
066                nameElement.addContent(name != null ? name : "");
067                property.addContent(nameElement);
068                Element valueElement = new Element("value", ns);
069                valueElement.addContent(value != null ? value : "");
070                property.addContent(valueElement);
071                configuration.addContent(property);
072            }
073    
074            return configuration;
075        }
076    
077        private Element generateMRSection(Configuration conf, Namespace ns) {
078            Element mapreduce = new Element("map-reduce", ns);
079            Element jt = new Element("job-tracker", ns);
080            jt.addContent(conf.get(XOozieClient.JT));
081            mapreduce.addContent(jt);
082            Element nn = new Element("name-node", ns);
083            nn.addContent(conf.get(XOozieClient.NN));
084            mapreduce.addContent(nn);
085    
086            if (conf.size() > MANDATORY_OOZIE_CONFS.size()) { // excluding JT, NN,
087                                                              // LIBPATH
088                // configuration section
089                Element configuration = generateConfigurationSection(conf, ns);
090                if (configuration != null) {
091                    mapreduce.addContent(configuration);
092                }
093    
094                // file section
095                addFileSection(mapreduce, conf, ns);
096    
097                // archive section
098                addArchiveSection(mapreduce, conf, ns);
099            }
100    
101            return mapreduce;
102        }
103    
104        /*
105         * (non-Javadoc)
106         *
107         * @see
108         * org.apache.oozie.command.wf.SubmitHttpCommand#getWorkflowXml(org.apache
109         * .hadoop.conf.Configuration)
110         */
111        @Override
112        protected String getWorkflowXml(Configuration conf) {
113            for (String key : MANDATORY_OOZIE_CONFS) {
114                String value = conf.get(key);
115                if (value == null) {
116                    throw new RuntimeException(key + " is not specified");
117                }
118            }
119    
120            Namespace ns = Namespace.getNamespace("uri:oozie:workflow:0.2");
121            Element root = new Element("workflow-app", ns);
122            root.setAttribute("name", "oozie-mapreduce");
123    
124            Element start = new Element("start", ns);
125            start.setAttribute("to", "hadoop1");
126            root.addContent(start);
127    
128            Element action = new Element("action", ns);
129            action.setAttribute("name", "hadoop1");
130    
131            Element mapreduce = generateMRSection(conf, ns);
132            action.addContent(mapreduce);
133    
134            Element ok = new Element("ok", ns);
135            ok.setAttribute("to", "end");
136            action.addContent(ok);
137    
138            Element error = new Element("error", ns);
139            error.setAttribute("to", "fail");
140            action.addContent(error);
141    
142            root.addContent(action);
143    
144            Element kill = new Element("kill", ns);
145            kill.setAttribute("name", "fail");
146            Element message = new Element("message", ns);
147            message.addContent("Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]");
148            kill.addContent(message);
149            root.addContent(kill);
150    
151            Element end = new Element("end", ns);
152            end.setAttribute("name", "end");
153            root.addContent(end);
154    
155            return XmlUtils.prettyPrint(root).toString();
156        }
157    }