001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.oozie.WorkflowJobBean;
022import org.apache.oozie.ErrorCode;
023import org.apache.oozie.service.JPAService;
024import org.apache.oozie.service.WorkflowStoreService;
025import org.apache.oozie.service.WorkflowAppService;
026import org.apache.oozie.service.Services;
027import org.apache.oozie.service.DagXLogInfoService;
028import org.apache.oozie.util.InstrumentUtils;
029import org.apache.oozie.util.LogUtils;
030import org.apache.oozie.util.XLog;
031import org.apache.oozie.util.ParamChecker;
032import org.apache.oozie.util.XConfiguration;
033import org.apache.oozie.util.XmlUtils;
034import org.apache.oozie.command.CommandException;
035import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
036import org.apache.oozie.store.StoreException;
037import org.apache.oozie.workflow.WorkflowApp;
038import org.apache.oozie.workflow.WorkflowException;
039import org.apache.oozie.workflow.WorkflowInstance;
040import org.apache.oozie.workflow.WorkflowLib;
041import org.apache.oozie.util.PropertiesUtils;
042import org.apache.oozie.client.OozieClient;
043import org.apache.oozie.client.WorkflowJob;
044import org.apache.oozie.client.XOozieClient;
045import org.jdom.Element;
046import org.jdom.Namespace;
047
048import java.util.Date;
049import java.util.List;
050import java.util.Map;
051import java.util.Set;
052import java.util.HashSet;
053
054public abstract class SubmitHttpXCommand extends WorkflowXCommand<String> {
055
056    protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>();
057    protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>();
058
059    static {
060        MANDATORY_OOZIE_CONFS.add(XOozieClient.JT);
061        MANDATORY_OOZIE_CONFS.add(XOozieClient.NN);
062        MANDATORY_OOZIE_CONFS.add(OozieClient.LIBPATH);
063
064        OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES);
065        OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES);
066    }
067
068    private Configuration conf;
069
070    public SubmitHttpXCommand(String name, String type, Configuration conf) {
071        super(name, type, 1);
072        this.conf = ParamChecker.notNull(conf, "conf");
073    }
074
075    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
076    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
077
078    static {
079        String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
080                PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
081                PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
082                PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
083        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
084
085        String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
086        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
087        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
088    }
089
090    abstract protected Element generateSection(Configuration conf, Namespace ns);
091
092    abstract protected Namespace getSectionNamespace();
093
094    abstract protected String getWorkflowName();
095
096    protected void checkMandatoryConf(Configuration conf) {
097        for (String key : MANDATORY_OOZIE_CONFS) {
098            String value = conf.get(key);
099            if (value == null) {
100                throw new RuntimeException(key + " is not specified");
101            }
102        }
103    }
104
105    protected Namespace getWorkflowNamespace() {
106        return Namespace.getNamespace("uri:oozie:workflow:0.2");
107    }
108    /**
109     * Generate workflow xml from conf object
110     *
111     * @param conf the configuration object
112     * @return workflow xml def string representation
113     */
114    protected String getWorkflowXml(Configuration conf) {
115        checkMandatoryConf(conf);
116
117        Namespace ns = getWorkflowNamespace();
118        Element root = new Element("workflow-app", ns);
119        String name = getWorkflowName();
120        root.setAttribute("name", "oozie-" + name);
121
122        Element start = new Element("start", ns);
123        String nodeName = name + "1";
124        start.setAttribute("to", nodeName);
125        root.addContent(start);
126
127        Element action = new Element("action", ns);
128        action.setAttribute("name", nodeName);
129
130        Element ele = generateSection(conf, getSectionNamespace());
131        action.addContent(ele);
132
133        Element ok = new Element("ok", ns);
134        ok.setAttribute("to", "end");
135        action.addContent(ok);
136
137        Element error = new Element("error", ns);
138        error.setAttribute("to", "fail");
139        action.addContent(error);
140
141        root.addContent(action);
142
143        Element kill = new Element("kill", ns);
144        kill.setAttribute("name", "fail");
145        Element message = new Element("message", ns);
146        message.addContent(name + " failed, error message[${wf:errorMessage(wf:lastErrorNode())}]");
147        kill.addContent(message);
148        root.addContent(kill);
149
150        Element end = new Element("end", ns);
151        end.setAttribute("name", "end");
152        root.addContent(end);
153
154        return XmlUtils.prettyPrint(root).toString();
155    };
156
157    protected Element generateConfigurationSection(List<String> Dargs, Namespace ns) {
158        Element configuration = new Element("configuration", ns);
159        for (String arg : Dargs) {
160            String name = null, value = null;
161            int pos = arg.indexOf("=");
162            if (pos == -1) { // "-D<name>" or "-D" only
163                name = arg.substring(2, arg.length());
164                value = "";
165            }
166            else { // "-D<name>=<value>"
167                name = arg.substring(2, pos);
168                value = arg.substring(pos + 1, arg.length());
169            }
170
171            Element property = new Element("property", ns);
172            Element nameElement = new Element("name", ns);
173            nameElement.addContent(name);
174            property.addContent(nameElement);
175            Element valueElement = new Element("value", ns);
176            valueElement.addContent(value);
177            property.addContent(valueElement);
178            configuration.addContent(property);
179        }
180
181        return configuration;
182    }
183
184    /* (non-Javadoc)
185     * @see org.apache.oozie.command.XCommand#execute()
186     */
187    @Override
188    protected String execute() throws CommandException {
189        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
190        WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
191        try {
192            XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
193            String wfXml = getWorkflowXml(conf);
194            LOG.debug("workflow xml created on the server side is :\n");
195            LOG.debug(wfXml);
196            WorkflowApp app = wps.parseDef(wfXml, conf);
197            XConfiguration protoActionConf = wps.createProtoActionConf(conf, false);
198            WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
199
200            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
201
202            // Resolving all variables in the job properties.
203            // This ensures the Hadoop Configuration semantics is preserved.
204            XConfiguration resolvedVarsConf = new XConfiguration();
205            for (Map.Entry<String, String> entry : conf) {
206                resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
207            }
208            conf = resolvedVarsConf;
209
210            WorkflowInstance wfInstance;
211            try {
212                wfInstance = workflowLib.createInstance(app, conf);
213            }
214            catch (WorkflowException e) {
215                throw new StoreException(e);
216            }
217
218            Configuration conf = wfInstance.getConf();
219
220            WorkflowJobBean workflow = new WorkflowJobBean();
221            workflow.setId(wfInstance.getId());
222            workflow.setAppName(app.getName());
223            workflow.setAppPath(conf.get(OozieClient.APP_PATH));
224            workflow.setConf(XmlUtils.prettyPrint(conf).toString());
225            workflow.setProtoActionConf(protoActionConf.toXmlString());
226            workflow.setCreatedTime(new Date());
227            workflow.setLastModifiedTime(new Date());
228            workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
229            workflow.setStatus(WorkflowJob.Status.PREP);
230            workflow.setRun(0);
231            workflow.setUser(conf.get(OozieClient.USER_NAME));
232            workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
233            workflow.setWorkflowInstance(wfInstance);
234            workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
235
236            LogUtils.setLogInfo(workflow, logInfo);
237            JPAService jpaService = Services.get().get(JPAService.class);
238            if (jpaService != null) {
239                jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow));
240            }
241            else {
242                LOG.error(ErrorCode.E0610);
243                return null;
244            }
245
246            return workflow.getId();
247        }
248        catch (WorkflowException ex) {
249            throw new CommandException(ex);
250        }
251        catch (Exception ex) {
252            throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
253        }
254    }
255
256    static private void addSection(Element X, Namespace ns, String filesStr, String tagName) {
257        if (filesStr != null) {
258            String[] files = filesStr.split(",");
259            for (String f : files) {
260                Element tagElement = new Element(tagName, ns);
261                if (f.contains("#")) {
262                    tagElement.addContent(f);
263                }
264                else {
265                    String filename = f.substring(f.lastIndexOf("/") + 1, f.length());
266                    if (filename == null || filename.isEmpty()) {
267                        tagElement.addContent(f);
268                    }
269                    else {
270                        tagElement.addContent(f + "#" + filename);
271                    }
272                }
273                X.addContent(tagElement);
274            }
275        }
276    }
277
278    /**
279     * Add file section in X.
280     *
281     * @param parent XML element to be appended
282     * @param conf Configuration object
283     * @param ns XML element namespace
284     */
285    static void addFileSection(Element X, Configuration conf, Namespace ns) {
286        String filesStr = conf.get(XOozieClient.FILES);
287        addSection(X, ns, filesStr, "file");
288    }
289
290    /**
291     * Add archive section in X.
292     *
293     * @param parent XML element to be appended
294     * @param conf Configuration object
295     * @param ns XML element namespace
296     */
297    static void addArchiveSection(Element X, Configuration conf, Namespace ns) {
298        String archivesStr = conf.get(XOozieClient.ARCHIVES);
299        addSection(X, ns, archivesStr, "archive");
300    }
301}