001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.oozie.service.WorkflowAppService;
022import org.jdom.Element;
023import org.jdom.Namespace;
024import org.apache.oozie.client.XOozieClient;
025import org.apache.oozie.command.CommandException;
026
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.Map;
031import java.util.Set;
032
033public class SubmitMRXCommand extends SubmitHttpXCommand {
034    private static final Set<String> SKIPPED_CONFS = new HashSet<String>();
035    private static final Map<String, String> DEPRECATE_MAP = new HashMap<String, String>();
036
037    public SubmitMRXCommand(Configuration conf) {
038        super("submitMR", "submitMR", conf);
039    }
040
041    static {
042        SKIPPED_CONFS.add(WorkflowAppService.HADOOP_USER);
043        SKIPPED_CONFS.add(XOozieClient.JT);
044        SKIPPED_CONFS.add(XOozieClient.NN);
045        // a brillant mind made a change in Configuration that 'fs.default.name' key gets converted to 'fs.defaultFS'
046        // in Hadoop 0.23, we need skip that one too, keeping the old one because of Hadoop 1
047        SKIPPED_CONFS.add(XOozieClient.NN_2);
048
049        DEPRECATE_MAP.put(XOozieClient.NN, XOozieClient.NN_2);
050        DEPRECATE_MAP.put(XOozieClient.JT, XOozieClient.JT_2);
051        DEPRECATE_MAP.put(WorkflowAppService.HADOOP_USER, "mapreduce.job.user.name");
052    }
053
054    @Override
055    protected Namespace getSectionNamespace(){
056        return Namespace.getNamespace("uri:oozie:workflow:0.2");
057    }
058
059    @Override
060    protected String getWorkflowName(){
061        return "mapreduce";
062    }
063
064    private Element generateConfigurationSection(Configuration conf, Namespace ns) {
065        Element configuration = null;
066        Iterator<Map.Entry<String, String>> iter = conf.iterator();
067        while (iter.hasNext()) {
068            Map.Entry<String, String> entry = iter.next();
069            String name = entry.getKey();
070            if (MANDATORY_OOZIE_CONFS.contains(name) || OPTIONAL_OOZIE_CONFS.contains(name)
071                    || SKIPPED_CONFS.contains(name)
072                    || DEPRECATE_MAP.containsValue(name)) {
073                continue;
074            }
075
076            if (configuration == null) {
077                configuration = new Element("configuration", ns);
078            }
079
080            String value = entry.getValue();
081            Element property = new Element("property", ns);
082            Element nameElement = new Element("name", ns);
083            nameElement.addContent(name != null ? name : "");
084            property.addContent(nameElement);
085            Element valueElement = new Element("value", ns);
086            valueElement.addContent(value != null ? value : "");
087            property.addContent(valueElement);
088            configuration.addContent(property);
089        }
090
091        return configuration;
092    }
093
094    @Override
095    protected Element generateSection(Configuration conf, Namespace ns) {
096        Element mapreduce = new Element("map-reduce", ns);
097        Element jt = new Element("job-tracker", ns);
098        String newJTVal = conf.get(DEPRECATE_MAP.get(XOozieClient.JT));
099        jt.addContent(newJTVal != null ? newJTVal : (conf.get(XOozieClient.JT)));
100        mapreduce.addContent(jt);
101        Element nn = new Element("name-node", ns);
102        String newNNVal = conf.get(DEPRECATE_MAP.get(XOozieClient.NN));
103        nn.addContent(newNNVal != null ? newNNVal : (conf.get(XOozieClient.NN)));
104        mapreduce.addContent(nn);
105
106        if (conf.size() > MANDATORY_OOZIE_CONFS.size()) { // excluding JT, NN,
107                                                          // LIBPATH
108            // configuration section
109            Element configuration = generateConfigurationSection(conf, ns);
110            if (configuration != null) {
111                mapreduce.addContent(configuration);
112            }
113
114            // file section
115            addFileSection(mapreduce, conf, ns);
116
117            // archive section
118            addArchiveSection(mapreduce, conf, ns);
119        }
120
121        return mapreduce;
122    }
123
124    @Override
125    protected void checkMandatoryConf(Configuration conf) {
126        for (String key : MANDATORY_OOZIE_CONFS) {
127            String value = conf.get(key);
128            if(value == null) {
129                if(DEPRECATE_MAP.containsKey(key)) {
130                    if(conf.get(DEPRECATE_MAP.get(key)) == null) {
131                        throw new RuntimeException(key + " or " + DEPRECATE_MAP.get(key) + " is not specified");
132                    }
133                }
134                else {
135                    throw new RuntimeException(key + " is not specified");
136                }
137            }
138        }
139    }
140
141    @Override
142    public String getEntityKey() {
143        return null;
144    }
145
146    @Override
147    protected boolean isLockRequired() {
148        return false;
149    }
150
151    @Override
152    protected void loadState() {
153
154    }
155
156    @Override
157    protected void verifyPrecondition() throws CommandException {
158
159    }
160}