001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.WorkflowJobBean;
022    import org.apache.oozie.ErrorCode;
023    import org.apache.oozie.service.WorkflowStoreService;
024    import org.apache.oozie.service.WorkflowAppService;
025    import org.apache.oozie.service.Services;
026    import org.apache.oozie.service.DagXLogInfoService;
027    import org.apache.oozie.util.XLog;
028    import org.apache.oozie.util.ParamChecker;
029    import org.apache.oozie.util.XConfiguration;
030    import org.apache.oozie.util.XmlUtils;
031    import org.apache.oozie.command.CommandException;
032    import org.apache.oozie.store.StoreException;
033    import org.apache.oozie.store.WorkflowStore;
034    import org.apache.oozie.workflow.WorkflowApp;
035    import org.apache.oozie.workflow.WorkflowException;
036    import org.apache.oozie.workflow.WorkflowInstance;
037    import org.apache.oozie.workflow.WorkflowLib;
038    import org.apache.oozie.util.PropertiesUtils;
039    import org.apache.oozie.client.OozieClient;
040    import org.apache.oozie.client.WorkflowJob;
041    import org.apache.oozie.client.XOozieClient;
042    import org.jdom.Element;
043    import org.jdom.Namespace;
044    
045    import java.util.Date;
046    import java.util.Map;
047    import java.util.Set;
048    import java.util.HashSet;
049    
050    public abstract class SubmitHttpCommand extends WorkflowCommand<String> {
051        public static final String USE_SYSTEM_LIBPATH_FOR_MR_PIG_JOBS ="use.system.libpath.for.mapreduce.and.pig.jobs";
052    
053        protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>();
054        protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>();
055    
056        static {
057            MANDATORY_OOZIE_CONFS.add(XOozieClient.JT);
058            MANDATORY_OOZIE_CONFS.add(XOozieClient.NN);
059    
060            OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES);
061            OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES);
062        }
063    
064        private Configuration conf;
065        private String authToken;
066    
067        public SubmitHttpCommand(String name, String type, Configuration conf, String authToken) {
068            super(name, type, 1, XLog.STD);
069            this.conf = ParamChecker.notNull(conf, "conf");
070            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
071        }
072    
073        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
074        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
075    
076        static {
077            String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
078                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
079                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
080                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
081            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
082    
083            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
084                    WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME };
085            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
086            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
087        }
088    
089    
090        /**
091         * Generate workflow xml from conf object
092         *
093         * @param conf the configuration object
094         * @return workflow xml def string representation
095         */
096        abstract protected String getWorkflowXml(Configuration conf);
097    
098        @Override
099        protected String call(WorkflowStore store) throws StoreException, CommandException {
100            incrJobCounter(1);
101            WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
102            try {
103                XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
104                String wfXml = getWorkflowXml(conf);
105                XLog.getLog(getClass()).debug("workflow xml created on the server side is :\n");
106                XLog.getLog(getClass()).debug(wfXml);
107                WorkflowApp app = wps.parseDef(wfXml);
108    
109                if (conf.get(OozieClient.USE_SYSTEM_LIBPATH) == null) {
110                    if (Services.get().getConf().getBoolean(USE_SYSTEM_LIBPATH_FOR_MR_PIG_JOBS, false)) {
111                        conf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, true);
112                    }
113                }
114    
115                XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, false);
116                WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
117    
118                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
119    
120                // Resolving all variables in the job properties.
121                // This ensures the Hadoop Configuration semantics is preserved.
122                XConfiguration resolvedVarsConf = new XConfiguration();
123                for (Map.Entry<String, String> entry : conf) {
124                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
125                }
126                conf = resolvedVarsConf;
127    
128                WorkflowInstance wfInstance;
129                try {
130                    wfInstance = workflowLib.createInstance(app, conf);
131                }
132                catch (WorkflowException e) {
133                    throw new StoreException(e);
134                }
135    
136                Configuration conf = wfInstance.getConf();
137    
138                WorkflowJobBean workflow = new WorkflowJobBean();
139                workflow.setId(wfInstance.getId());
140                workflow.setAppName(app.getName());
141                workflow.setAppPath(conf.get(OozieClient.APP_PATH));
142                workflow.setConf(XmlUtils.prettyPrint(conf).toString());
143                workflow.setProtoActionConf(protoActionConf.toXmlString());
144                workflow.setCreatedTime(new Date());
145                workflow.setLastModifiedTime(new Date());
146                workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
147                workflow.setStatus(WorkflowJob.Status.PREP);
148                workflow.setRun(0);
149                workflow.setUser(conf.get(OozieClient.USER_NAME));
150                workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
151                workflow.setAuthToken(authToken);
152                workflow.setWorkflowInstance(wfInstance);
153                workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
154    
155                setLogInfo(workflow);
156                store.insertWorkflow(workflow);
157    
158                return workflow.getId();
159            }
160            catch (WorkflowException ex) {
161                throw new CommandException(ex);
162            }
163            catch (Exception ex) {
164                throw new CommandException(ErrorCode.E0803, ex);
165            }
166        }
167    
168        static private void addSection(Element X, Namespace ns, String filesStr, String tagName) {
169            if (filesStr != null) {
170                String[] files = filesStr.split(",");
171                for (String f : files) {
172                    Element tagElement = new Element(tagName, ns);
173                    if (f.contains("#")) {
174                        tagElement.addContent(f);
175                    }
176                    else {
177                        String filename = f.substring(f.lastIndexOf("/") + 1, f.length());
178                        if (filename == null || filename.isEmpty()) {
179                            tagElement.addContent(f);
180                        }
181                        else {
182                            tagElement.addContent(f + "#" + filename);
183                        }
184                    }
185                    X.addContent(tagElement);
186                }
187            }
188        }
189    
190        /**
191         * Add file section in X.
192         *
193         * @param parent XML element to be appended
194         * @param conf Configuration object
195         * @param ns XML element namespace
196         */
197        static void addFileSection(Element X, Configuration conf, Namespace ns) {
198            String filesStr = conf.get(XOozieClient.FILES);
199            addSection(X, ns, filesStr, "file");
200        }
201    
202        /**
203         * Add archive section in X.
204         *
205         * @param parent XML element to be appended
206         * @param conf Configuration object
207         * @param ns XML element namespace
208         */
209        static void addArchiveSection(Element X, Configuration conf, Namespace ns) {
210            String archivesStr = conf.get(XOozieClient.ARCHIVES);
211            addSection(X, ns, archivesStr, "archive");
212        }
213    }