001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.WorkflowJobBean;
022    import org.apache.oozie.ErrorCode;
023    import org.apache.oozie.service.JPAService;
024    import org.apache.oozie.service.WorkflowStoreService;
025    import org.apache.oozie.service.WorkflowAppService;
026    import org.apache.oozie.service.Services;
027    import org.apache.oozie.service.DagXLogInfoService;
028    import org.apache.oozie.util.InstrumentUtils;
029    import org.apache.oozie.util.LogUtils;
030    import org.apache.oozie.util.XLog;
031    import org.apache.oozie.util.ParamChecker;
032    import org.apache.oozie.util.XConfiguration;
033    import org.apache.oozie.util.XmlUtils;
034    import org.apache.oozie.command.CommandException;
035    import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
036    import org.apache.oozie.store.StoreException;
037    import org.apache.oozie.workflow.WorkflowApp;
038    import org.apache.oozie.workflow.WorkflowException;
039    import org.apache.oozie.workflow.WorkflowInstance;
040    import org.apache.oozie.workflow.WorkflowLib;
041    import org.apache.oozie.util.PropertiesUtils;
042    import org.apache.oozie.client.OozieClient;
043    import org.apache.oozie.client.WorkflowJob;
044    import org.apache.oozie.client.XOozieClient;
045    import org.jdom.Element;
046    import org.jdom.Namespace;
047    
048    import java.util.Date;
049    import java.util.Map;
050    import java.util.Set;
051    import java.util.HashSet;
052    
053    public abstract class SubmitHttpXCommand extends WorkflowXCommand<String> {
054    
055        protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>();
056        protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>();
057    
058        static {
059            MANDATORY_OOZIE_CONFS.add(XOozieClient.JT);
060            MANDATORY_OOZIE_CONFS.add(XOozieClient.NN);
061            MANDATORY_OOZIE_CONFS.add(OozieClient.LIBPATH);
062    
063            OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES);
064            OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES);
065        }
066    
067        private Configuration conf;
068        private String authToken;
069    
070        public SubmitHttpXCommand(String name, String type, Configuration conf, String authToken) {
071            super(name, type, 1);
072            this.conf = ParamChecker.notNull(conf, "conf");
073            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
074        }
075    
076        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
077        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
078    
079        static {
080            String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
081                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
082                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
083                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
084            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
085    
086            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
087                    WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME };
088            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
089            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
090        }
091    
092        /**
093         * Generate workflow xml from conf object
094         *
095         * @param conf the configuration object
096         * @return workflow xml def string representation
097         */
098        abstract protected String getWorkflowXml(Configuration conf);
099    
100        /* (non-Javadoc)
101         * @see org.apache.oozie.command.XCommand#execute()
102         */
103        @Override
104        protected String execute() throws CommandException {
105            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
106            WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
107            try {
108                XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
109                String wfXml = getWorkflowXml(conf);
110                LOG.debug("workflow xml created on the server side is :\n");
111                LOG.debug(wfXml);
112                WorkflowApp app = wps.parseDef(wfXml);
113                XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, false);
114                WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
115    
116                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
117    
118                // Resolving all variables in the job properties.
119                // This ensures the Hadoop Configuration semantics is preserved.
120                XConfiguration resolvedVarsConf = new XConfiguration();
121                for (Map.Entry<String, String> entry : conf) {
122                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
123                }
124                conf = resolvedVarsConf;
125    
126                WorkflowInstance wfInstance;
127                try {
128                    wfInstance = workflowLib.createInstance(app, conf);
129                }
130                catch (WorkflowException e) {
131                    throw new StoreException(e);
132                }
133    
134                Configuration conf = wfInstance.getConf();
135    
136                WorkflowJobBean workflow = new WorkflowJobBean();
137                workflow.setId(wfInstance.getId());
138                workflow.setAppName(app.getName());
139                workflow.setAppPath(conf.get(OozieClient.APP_PATH));
140                workflow.setConf(XmlUtils.prettyPrint(conf).toString());
141                workflow.setProtoActionConf(protoActionConf.toXmlString());
142                workflow.setCreatedTime(new Date());
143                workflow.setLastModifiedTime(new Date());
144                workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
145                workflow.setStatus(WorkflowJob.Status.PREP);
146                workflow.setRun(0);
147                workflow.setUser(conf.get(OozieClient.USER_NAME));
148                workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
149                workflow.setAuthToken(authToken);
150                workflow.setWorkflowInstance(wfInstance);
151                workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
152    
153                LogUtils.setLogInfo(workflow, logInfo);
154                JPAService jpaService = Services.get().get(JPAService.class);
155                if (jpaService != null) {
156                    jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow));
157                }
158                else {
159                    LOG.error(ErrorCode.E0610);
160                    return null;
161                }
162    
163                return workflow.getId();
164            }
165            catch (WorkflowException ex) {
166                throw new CommandException(ex);
167            }
168            catch (Exception ex) {
169                throw new CommandException(ErrorCode.E0803, ex);
170            }
171        }
172    
173        static private void addSection(Element X, Namespace ns, String filesStr, String tagName) {
174            if (filesStr != null) {
175                String[] files = filesStr.split(",");
176                for (String f : files) {
177                    Element tagElement = new Element(tagName, ns);
178                    if (f.contains("#")) {
179                        tagElement.addContent(f);
180                    }
181                    else {
182                        String filename = f.substring(f.lastIndexOf("/") + 1, f.length());
183                        if (filename == null || filename.isEmpty()) {
184                            tagElement.addContent(f);
185                        }
186                        else {
187                            tagElement.addContent(f + "#" + filename);
188                        }
189                    }
190                    X.addContent(tagElement);
191                }
192            }
193        }
194    
195        /**
196         * Add file section in X.
197         *
198         * @param parent XML element to be appended
199         * @param conf Configuration object
200         * @param ns XML element namespace
201         */
202        static void addFileSection(Element X, Configuration conf, Namespace ns) {
203            String filesStr = conf.get(XOozieClient.FILES);
204            addSection(X, ns, filesStr, "file");
205        }
206    
207        /**
208         * Add archive section in X.
209         *
210         * @param parent XML element to be appended
211         * @param conf Configuration object
212         * @param ns XML element namespace
213         */
214        static void addArchiveSection(Element X, Configuration conf, Namespace ns) {
215            String archivesStr = conf.get(XOozieClient.ARCHIVES);
216            addSection(X, ns, archivesStr, "archive");
217        }
218    }