001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.service.WorkflowAppService;
022    import org.apache.oozie.util.XmlUtils;
023    import org.jdom.Element;
024    import org.jdom.Namespace;
025    import org.apache.oozie.client.XOozieClient;
026    import org.apache.oozie.command.CommandException;
027    
028    import java.util.HashMap;
029    import java.util.HashSet;
030    import java.util.Iterator;
031    import java.util.Map;
032    import java.util.Set;
033    
034    public class SubmitMRXCommand extends SubmitHttpXCommand {
035        private static final Set<String> SKIPPED_CONFS = new HashSet<String>();
036        private static final Map<String, String> DEPRECATE_MAP = new HashMap<String, String>();
037    
038        public SubmitMRXCommand(Configuration conf, String authToken) {
039            super("submitMR", "submitMR", conf, authToken);
040        }
041    
042        static {
043            SKIPPED_CONFS.add(WorkflowAppService.HADOOP_USER);
044            SKIPPED_CONFS.add(XOozieClient.JT);
045            SKIPPED_CONFS.add(XOozieClient.NN);
046            // a brillant mind made a change in Configuration that 'fs.default.name' key gets converted to 'fs.defaultFS'
047            // in Hadoop 0.23, we need skip that one too, keeping the old one because of Hadoop 1
048            SKIPPED_CONFS.add(XOozieClient.NN_2);
049    
050            DEPRECATE_MAP.put(XOozieClient.NN, XOozieClient.NN_2);
051            DEPRECATE_MAP.put(XOozieClient.JT, XOozieClient.JT_2);
052            DEPRECATE_MAP.put(WorkflowAppService.HADOOP_USER, "mapreduce.job.user.name");
053        }
054    
055        private Element generateConfigurationSection(Configuration conf, Namespace ns) {
056            Element configuration = null;
057            Iterator<Map.Entry<String, String>> iter = conf.iterator();
058            while (iter.hasNext()) {
059                Map.Entry<String, String> entry = iter.next();
060                String name = entry.getKey();
061                if (MANDATORY_OOZIE_CONFS.contains(name) || OPTIONAL_OOZIE_CONFS.contains(name)
062                        || SKIPPED_CONFS.contains(name)
063                        || DEPRECATE_MAP.containsValue(name)) {
064                    continue;
065                }
066    
067                if (configuration == null) {
068                    configuration = new Element("configuration", ns);
069                }
070    
071                String value = entry.getValue();
072                Element property = new Element("property", ns);
073                Element nameElement = new Element("name", ns);
074                nameElement.addContent(name != null ? name : "");
075                property.addContent(nameElement);
076                Element valueElement = new Element("value", ns);
077                valueElement.addContent(value != null ? value : "");
078                property.addContent(valueElement);
079                configuration.addContent(property);
080            }
081    
082            return configuration;
083        }
084    
085        private Element generateMRSection(Configuration conf, Namespace ns) {
086            Element mapreduce = new Element("map-reduce", ns);
087            Element jt = new Element("job-tracker", ns);
088            String newJTVal = conf.get(DEPRECATE_MAP.get(XOozieClient.JT));
089            jt.addContent(newJTVal != null ? newJTVal : (conf.get(XOozieClient.JT)));
090            mapreduce.addContent(jt);
091            Element nn = new Element("name-node", ns);
092            String newNNVal = conf.get(DEPRECATE_MAP.get(XOozieClient.NN));
093            nn.addContent(newNNVal != null ? newNNVal : (conf.get(XOozieClient.NN)));
094            mapreduce.addContent(nn);
095    
096            if (conf.size() > MANDATORY_OOZIE_CONFS.size()) { // excluding JT, NN,
097                                                              // LIBPATH
098                // configuration section
099                Element configuration = generateConfigurationSection(conf, ns);
100                if (configuration != null) {
101                    mapreduce.addContent(configuration);
102                }
103    
104                // file section
105                addFileSection(mapreduce, conf, ns);
106    
107                // archive section
108                addArchiveSection(mapreduce, conf, ns);
109            }
110    
111            return mapreduce;
112        }
113    
114        /*
115         * (non-Javadoc)
116         *
117         * @see
118         * org.apache.oozie.command.wf.SubmitHttpCommand#getWorkflowXml(org.apache
119         * .hadoop.conf.Configuration)
120         */
121        @Override
122        protected String getWorkflowXml(Configuration conf) {
123            for (String key : MANDATORY_OOZIE_CONFS) {
124                String value = conf.get(key);
125                if(value == null) {
126                    if(DEPRECATE_MAP.containsKey(key)) {
127                        if(conf.get(DEPRECATE_MAP.get(key)) == null) {
128                            throw new RuntimeException(key + " or " + DEPRECATE_MAP.get(key) + " is not specified");
129                        }
130                    }
131                    else {
132                        throw new RuntimeException(key + " is not specified");
133                    }
134                }
135            }
136    
137            Namespace ns = Namespace.getNamespace("uri:oozie:workflow:0.2");
138            Element root = new Element("workflow-app", ns);
139            root.setAttribute("name", "oozie-mapreduce");
140    
141            Element start = new Element("start", ns);
142            start.setAttribute("to", "hadoop1");
143            root.addContent(start);
144    
145            Element action = new Element("action", ns);
146            action.setAttribute("name", "hadoop1");
147    
148            Element mapreduce = generateMRSection(conf, ns);
149            action.addContent(mapreduce);
150    
151            Element ok = new Element("ok", ns);
152            ok.setAttribute("to", "end");
153            action.addContent(ok);
154    
155            Element error = new Element("error", ns);
156            error.setAttribute("to", "fail");
157            action.addContent(error);
158    
159            root.addContent(action);
160    
161            Element kill = new Element("kill", ns);
162            kill.setAttribute("name", "fail");
163            Element message = new Element("message", ns);
164            message.addContent("Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]");
165            kill.addContent(message);
166            root.addContent(kill);
167    
168            Element end = new Element("end", ns);
169            end.setAttribute("name", "end");
170            root.addContent(end);
171    
172            return XmlUtils.prettyPrint(root).toString();
173        }
174    
175        @Override
176        public String getEntityKey() {
177            return null;
178        }
179    
180        @Override
181        protected boolean isLockRequired() {
182            return false;
183        }
184    
185        @Override
186        protected void loadState() {
187    
188        }
189    
190        @Override
191        protected void verifyPrecondition() throws CommandException {
192    
193        }
194    }