001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.oozie.WorkflowJobBean;
023import org.apache.oozie.ErrorCode;
024import org.apache.oozie.service.JPAService;
025import org.apache.oozie.service.WorkflowStoreService;
026import org.apache.oozie.service.WorkflowAppService;
027import org.apache.oozie.service.Services;
028import org.apache.oozie.service.DagXLogInfoService;
029import org.apache.oozie.util.InstrumentUtils;
030import org.apache.oozie.util.LogUtils;
031import org.apache.oozie.util.XLog;
032import org.apache.oozie.util.ParamChecker;
033import org.apache.oozie.util.XConfiguration;
034import org.apache.oozie.util.XmlUtils;
035import org.apache.oozie.command.CommandException;
036import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
037import org.apache.oozie.store.StoreException;
038import org.apache.oozie.workflow.WorkflowApp;
039import org.apache.oozie.workflow.WorkflowException;
040import org.apache.oozie.workflow.WorkflowInstance;
041import org.apache.oozie.workflow.WorkflowLib;
042import org.apache.oozie.util.PropertiesUtils;
043import org.apache.oozie.client.OozieClient;
044import org.apache.oozie.client.WorkflowJob;
045import org.apache.oozie.client.XOozieClient;
046import org.jdom.Element;
047import org.jdom.Namespace;
048
049import java.util.Date;
050import java.util.List;
051import java.util.Map;
052import java.util.Set;
053import java.util.HashSet;
054
055public abstract class SubmitHttpXCommand extends WorkflowXCommand<String> {
056
057    protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>();
058    protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>();
059
060    static {
061        MANDATORY_OOZIE_CONFS.add(XOozieClient.JT);
062        MANDATORY_OOZIE_CONFS.add(XOozieClient.NN);
063        MANDATORY_OOZIE_CONFS.add(OozieClient.LIBPATH);
064
065        OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES);
066        OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES);
067    }
068
069    private Configuration conf;
070
071    public SubmitHttpXCommand(String name, String type, Configuration conf) {
072        super(name, type, 1);
073        this.conf = ParamChecker.notNull(conf, "conf");
074    }
075
076    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
077    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
078
079    static {
080        String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
081                PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
082                PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
083                PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
084        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
085
086        String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
087        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
088        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
089    }
090
091    abstract protected Element generateSection(Configuration conf, Namespace ns);
092
093    abstract protected Namespace getSectionNamespace();
094
095    abstract protected String getWorkflowName();
096
097    protected void checkMandatoryConf(Configuration conf) {
098        for (String key : MANDATORY_OOZIE_CONFS) {
099            String value = conf.get(key);
100            if (value == null) {
101                throw new RuntimeException(key + " is not specified");
102            }
103        }
104    }
105
106    protected Namespace getWorkflowNamespace() {
107        return Namespace.getNamespace("uri:oozie:workflow:0.2");
108    }
109    /**
110     * Generate workflow xml from conf object
111     *
112     * @param conf the configuration object
113     * @return workflow xml def string representation
114     */
115    protected String getWorkflowXml(Configuration conf) {
116        checkMandatoryConf(conf);
117
118        Namespace ns = getWorkflowNamespace();
119        Element root = new Element("workflow-app", ns);
120        String name = getWorkflowName();
121        root.setAttribute("name", "oozie-" + name);
122
123        Element start = new Element("start", ns);
124        String nodeName = name + "1";
125        start.setAttribute("to", nodeName);
126        root.addContent(start);
127
128        Element action = new Element("action", ns);
129        action.setAttribute("name", nodeName);
130
131        Element ele = generateSection(conf, getSectionNamespace());
132        action.addContent(ele);
133
134        Element ok = new Element("ok", ns);
135        ok.setAttribute("to", "end");
136        action.addContent(ok);
137
138        Element error = new Element("error", ns);
139        error.setAttribute("to", "fail");
140        action.addContent(error);
141
142        root.addContent(action);
143
144        Element kill = new Element("kill", ns);
145        kill.setAttribute("name", "fail");
146        Element message = new Element("message", ns);
147        message.addContent(name + " failed, error message[${wf:errorMessage(wf:lastErrorNode())}]");
148        kill.addContent(message);
149        root.addContent(kill);
150
151        Element end = new Element("end", ns);
152        end.setAttribute("name", "end");
153        root.addContent(end);
154
155        return XmlUtils.prettyPrint(root).toString();
156    };
157
158    protected Element generateConfigurationSection(List<String> Dargs, Namespace ns) {
159        Element configuration = new Element("configuration", ns);
160        for (String arg : Dargs) {
161            String name = null, value = null;
162            int pos = arg.indexOf("=");
163            if (pos == -1) { // "-D<name>" or "-D" only
164                name = arg.substring(2, arg.length());
165                value = "";
166            }
167            else { // "-D<name>=<value>"
168                name = arg.substring(2, pos);
169                value = arg.substring(pos + 1, arg.length());
170            }
171
172            Element property = new Element("property", ns);
173            Element nameElement = new Element("name", ns);
174            nameElement.addContent(name);
175            property.addContent(nameElement);
176            Element valueElement = new Element("value", ns);
177            valueElement.addContent(value);
178            property.addContent(valueElement);
179            configuration.addContent(property);
180        }
181
182        return configuration;
183    }
184
185    /* (non-Javadoc)
186     * @see org.apache.oozie.command.XCommand#execute()
187     */
188    @Override
189    protected String execute() throws CommandException {
190        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
191        WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
192        try {
193            XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
194            String wfXml = getWorkflowXml(conf);
195            LOG.debug("workflow xml created on the server side is :\n");
196            LOG.debug(wfXml);
197            WorkflowApp app = wps.parseDef(wfXml, conf);
198            XConfiguration protoActionConf = wps.createProtoActionConf(conf, false);
199            WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
200
201            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
202
203            // Resolving all variables in the job properties.
204            // This ensures the Hadoop Configuration semantics is preserved.
205            XConfiguration resolvedVarsConf = new XConfiguration();
206            for (Map.Entry<String, String> entry : conf) {
207                resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
208            }
209            conf = resolvedVarsConf;
210
211            WorkflowInstance wfInstance;
212            try {
213                wfInstance = workflowLib.createInstance(app, conf);
214            }
215            catch (WorkflowException e) {
216                throw new StoreException(e);
217            }
218
219            Configuration conf = wfInstance.getConf();
220
221            WorkflowJobBean workflow = new WorkflowJobBean();
222            workflow.setId(wfInstance.getId());
223            workflow.setAppName(app.getName());
224            workflow.setAppPath(conf.get(OozieClient.APP_PATH));
225            workflow.setConf(XmlUtils.prettyPrint(conf).toString());
226            workflow.setProtoActionConf(protoActionConf.toXmlString());
227            workflow.setCreatedTime(new Date());
228            workflow.setLastModifiedTime(new Date());
229            workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
230            workflow.setStatus(WorkflowJob.Status.PREP);
231            workflow.setRun(0);
232            workflow.setUser(conf.get(OozieClient.USER_NAME));
233            workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
234            workflow.setWorkflowInstance(wfInstance);
235            workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
236
237            LogUtils.setLogInfo(workflow);
238            JPAService jpaService = Services.get().get(JPAService.class);
239            if (jpaService != null) {
240                jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow));
241            }
242            else {
243                LOG.error(ErrorCode.E0610);
244                return null;
245            }
246
247            return workflow.getId();
248        }
249        catch (WorkflowException ex) {
250            throw new CommandException(ex);
251        }
252        catch (Exception ex) {
253            throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
254        }
255    }
256
257    static private void addSection(Element X, Namespace ns, String filesStr, String tagName) {
258        if (filesStr != null) {
259            String[] files = filesStr.split(",");
260            for (String f : files) {
261                Element tagElement = new Element(tagName, ns);
262                if (f.contains("#")) {
263                    tagElement.addContent(f);
264                }
265                else {
266                    String filename = f.substring(f.lastIndexOf("/") + 1, f.length());
267                    if (filename == null || filename.isEmpty()) {
268                        tagElement.addContent(f);
269                    }
270                    else {
271                        tagElement.addContent(f + "#" + filename);
272                    }
273                }
274                X.addContent(tagElement);
275            }
276        }
277    }
278
279    /**
280     * Add file section in X.
281     *
282     * @param parent XML element to be appended
283     * @param conf Configuration object
284     * @param ns XML element namespace
285     */
286    static void addFileSection(Element X, Configuration conf, Namespace ns) {
287        String filesStr = conf.get(XOozieClient.FILES);
288        addSection(X, ns, filesStr, "file");
289    }
290
291    /**
292     * Add archive section in X.
293     *
294     * @param parent XML element to be appended
295     * @param conf Configuration object
296     * @param ns XML element namespace
297     */
298    static void addArchiveSection(Element X, Configuration conf, Namespace ns) {
299        String archivesStr = conf.get(XOozieClient.ARCHIVES);
300        addSection(X, ns, archivesStr, "archive");
301    }
302}