001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.service.WorkflowAppService;
022    import org.apache.oozie.util.XmlUtils;
023    import org.jdom.Element;
024    import org.jdom.Namespace;
025    import org.apache.oozie.client.XOozieClient;
026    import org.apache.oozie.command.CommandException;
027    
028    import java.util.HashSet;
029    import java.util.Iterator;
030    import java.util.Map;
031    import java.util.Set;
032    
033    public class SubmitMRXCommand extends SubmitHttpXCommand {
034        private static final Set<String> SKIPPED_CONFS = new HashSet<String>();
035    
036        public SubmitMRXCommand(Configuration conf, String authToken) {
037            super("submitMR", "submitMR", conf, authToken);
038        }
039    
040        static {
041            SKIPPED_CONFS.add(WorkflowAppService.HADOOP_USER);
042            SKIPPED_CONFS.add(XOozieClient.JT);
043            SKIPPED_CONFS.add(XOozieClient.NN);
044            // a brillant mind made a change in Configuration that 'fs.default.name' key gets converted to 'fs.defaultFS'
045            // in Hadoop 0.23, we need skip that one too, keeping the old one because of Hadoop 1
046            SKIPPED_CONFS.add("fs.defaultFS");
047        }
048    
049        private Element generateConfigurationSection(Configuration conf, Namespace ns) {
050            Element configuration = null;
051            Iterator<Map.Entry<String, String>> iter = conf.iterator();
052            while (iter.hasNext()) {
053                Map.Entry<String, String> entry = iter.next();
054                String name = entry.getKey();
055                if (MANDATORY_OOZIE_CONFS.contains(name) || OPTIONAL_OOZIE_CONFS.contains(name)
056                        || SKIPPED_CONFS.contains(name)) {
057                    continue;
058                }
059    
060                if (configuration == null) {
061                    configuration = new Element("configuration", ns);
062                }
063    
064                String value = entry.getValue();
065                Element property = new Element("property", ns);
066                Element nameElement = new Element("name", ns);
067                nameElement.addContent(name != null ? name : "");
068                property.addContent(nameElement);
069                Element valueElement = new Element("value", ns);
070                valueElement.addContent(value != null ? value : "");
071                property.addContent(valueElement);
072                configuration.addContent(property);
073            }
074    
075            return configuration;
076        }
077    
078        private Element generateMRSection(Configuration conf, Namespace ns) {
079            Element mapreduce = new Element("map-reduce", ns);
080            Element jt = new Element("job-tracker", ns);
081            jt.addContent(conf.get(XOozieClient.JT));
082            mapreduce.addContent(jt);
083            Element nn = new Element("name-node", ns);
084            nn.addContent(conf.get(XOozieClient.NN));
085            mapreduce.addContent(nn);
086    
087            if (conf.size() > MANDATORY_OOZIE_CONFS.size()) { // excluding JT, NN,
088                                                              // LIBPATH
089                // configuration section
090                Element configuration = generateConfigurationSection(conf, ns);
091                if (configuration != null) {
092                    mapreduce.addContent(configuration);
093                }
094    
095                // file section
096                addFileSection(mapreduce, conf, ns);
097    
098                // archive section
099                addArchiveSection(mapreduce, conf, ns);
100            }
101    
102            return mapreduce;
103        }
104    
105        /*
106         * (non-Javadoc)
107         *
108         * @see
109         * org.apache.oozie.command.wf.SubmitHttpCommand#getWorkflowXml(org.apache
110         * .hadoop.conf.Configuration)
111         */
112        @Override
113        protected String getWorkflowXml(Configuration conf) {
114            for (String key : MANDATORY_OOZIE_CONFS) {
115                String value = conf.get(key);
116                if (value == null) {
117                    throw new RuntimeException(key + " is not specified");
118                }
119            }
120    
121            Namespace ns = Namespace.getNamespace("uri:oozie:workflow:0.2");
122            Element root = new Element("workflow-app", ns);
123            root.setAttribute("name", "oozie-mapreduce");
124    
125            Element start = new Element("start", ns);
126            start.setAttribute("to", "hadoop1");
127            root.addContent(start);
128    
129            Element action = new Element("action", ns);
130            action.setAttribute("name", "hadoop1");
131    
132            Element mapreduce = generateMRSection(conf, ns);
133            action.addContent(mapreduce);
134    
135            Element ok = new Element("ok", ns);
136            ok.setAttribute("to", "end");
137            action.addContent(ok);
138    
139            Element error = new Element("error", ns);
140            error.setAttribute("to", "fail");
141            action.addContent(error);
142    
143            root.addContent(action);
144    
145            Element kill = new Element("kill", ns);
146            kill.setAttribute("name", "fail");
147            Element message = new Element("message", ns);
148            message.addContent("Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]");
149            kill.addContent(message);
150            root.addContent(kill);
151    
152            Element end = new Element("end", ns);
153            end.setAttribute("name", "end");
154            root.addContent(end);
155    
156            return XmlUtils.prettyPrint(root).toString();
157        }
158    
159        @Override
160        public String getEntityKey() {
161            return null;
162        }
163    
164        @Override
165        protected boolean isLockRequired() {
166            return false;
167        }
168    
169        @Override
170        protected void loadState() {
171    
172        }
173    
174        @Override
175        protected void verifyPrecondition() throws CommandException {
176    
177        }
178    }