This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.WorkflowJobBean;
022 import org.apache.oozie.ErrorCode;
023 import org.apache.oozie.service.WorkflowStoreService;
024 import org.apache.oozie.service.WorkflowAppService;
025 import org.apache.oozie.service.Services;
026 import org.apache.oozie.service.DagXLogInfoService;
027 import org.apache.oozie.util.XLog;
028 import org.apache.oozie.util.ParamChecker;
029 import org.apache.oozie.util.XConfiguration;
030 import org.apache.oozie.util.XmlUtils;
031 import org.apache.oozie.command.CommandException;
032 import org.apache.oozie.store.StoreException;
033 import org.apache.oozie.store.WorkflowStore;
034 import org.apache.oozie.workflow.WorkflowApp;
035 import org.apache.oozie.workflow.WorkflowException;
036 import org.apache.oozie.workflow.WorkflowInstance;
037 import org.apache.oozie.workflow.WorkflowLib;
038 import org.apache.oozie.util.PropertiesUtils;
039 import org.apache.oozie.client.OozieClient;
040 import org.apache.oozie.client.WorkflowJob;
041 import org.apache.oozie.client.XOozieClient;
042 import org.jdom.Element;
043 import org.jdom.Namespace;
044
045 import java.util.Date;
046 import java.util.Map;
047 import java.util.Set;
048 import java.util.HashSet;
049
050 public abstract class SubmitHttpCommand extends WorkflowCommand<String> {
051 public static final String USE_SYSTEM_LIBPATH_FOR_MR_PIG_JOBS ="use.system.libpath.for.mapreduce.and.pig.jobs";
052
053 protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>();
054 protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>();
055
056 static {
057 MANDATORY_OOZIE_CONFS.add(XOozieClient.JT);
058 MANDATORY_OOZIE_CONFS.add(XOozieClient.NN);
059
060 OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES);
061 OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES);
062 }
063
064 private Configuration conf;
065 private String authToken;
066
067 public SubmitHttpCommand(String name, String type, Configuration conf, String authToken) {
068 super(name, type, 1, XLog.STD);
069 this.conf = ParamChecker.notNull(conf, "conf");
070 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
071 }
072
073 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
074 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
075
076 static {
077 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
078 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
079 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
080 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
081 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
082
083 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
084 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME };
085 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
086 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
087 }
088
089
090 /**
091 * Generate workflow xml from conf object
092 *
093 * @param conf the configuration object
094 * @return workflow xml def string representation
095 */
096 abstract protected String getWorkflowXml(Configuration conf);
097
098 @Override
099 protected String call(WorkflowStore store) throws StoreException, CommandException {
100 incrJobCounter(1);
101 WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
102 try {
103 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
104 String wfXml = getWorkflowXml(conf);
105 XLog.getLog(getClass()).debug("workflow xml created on the server side is :\n");
106 XLog.getLog(getClass()).debug(wfXml);
107 WorkflowApp app = wps.parseDef(wfXml);
108
109 if (conf.get(OozieClient.USE_SYSTEM_LIBPATH) == null) {
110 if (Services.get().getConf().getBoolean(USE_SYSTEM_LIBPATH_FOR_MR_PIG_JOBS, false)) {
111 conf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, true);
112 }
113 }
114
115 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, false);
116 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
117
118 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
119
120 // Resolving all variables in the job properties.
121 // This ensures the Hadoop Configuration semantics is preserved.
122 XConfiguration resolvedVarsConf = new XConfiguration();
123 for (Map.Entry<String, String> entry : conf) {
124 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
125 }
126 conf = resolvedVarsConf;
127
128 WorkflowInstance wfInstance;
129 try {
130 wfInstance = workflowLib.createInstance(app, conf);
131 }
132 catch (WorkflowException e) {
133 throw new StoreException(e);
134 }
135
136 Configuration conf = wfInstance.getConf();
137
138 WorkflowJobBean workflow = new WorkflowJobBean();
139 workflow.setId(wfInstance.getId());
140 workflow.setAppName(app.getName());
141 workflow.setAppPath(conf.get(OozieClient.APP_PATH));
142 workflow.setConf(XmlUtils.prettyPrint(conf).toString());
143 workflow.setProtoActionConf(protoActionConf.toXmlString());
144 workflow.setCreatedTime(new Date());
145 workflow.setLastModifiedTime(new Date());
146 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
147 workflow.setStatus(WorkflowJob.Status.PREP);
148 workflow.setRun(0);
149 workflow.setUser(conf.get(OozieClient.USER_NAME));
150 workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
151 workflow.setAuthToken(authToken);
152 workflow.setWorkflowInstance(wfInstance);
153 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
154
155 setLogInfo(workflow);
156 store.insertWorkflow(workflow);
157
158 return workflow.getId();
159 }
160 catch (WorkflowException ex) {
161 throw new CommandException(ex);
162 }
163 catch (Exception ex) {
164 throw new CommandException(ErrorCode.E0803, ex);
165 }
166 }
167
168 static private void addSection(Element X, Namespace ns, String filesStr, String tagName) {
169 if (filesStr != null) {
170 String[] files = filesStr.split(",");
171 for (String f : files) {
172 Element tagElement = new Element(tagName, ns);
173 if (f.contains("#")) {
174 tagElement.addContent(f);
175 }
176 else {
177 String filename = f.substring(f.lastIndexOf("/") + 1, f.length());
178 if (filename == null || filename.isEmpty()) {
179 tagElement.addContent(f);
180 }
181 else {
182 tagElement.addContent(f + "#" + filename);
183 }
184 }
185 X.addContent(tagElement);
186 }
187 }
188 }
189
190 /**
191 * Add file section in X.
192 *
193 * @param parent XML element to be appended
194 * @param conf Configuration object
195 * @param ns XML element namespace
196 */
197 static void addFileSection(Element X, Configuration conf, Namespace ns) {
198 String filesStr = conf.get(XOozieClient.FILES);
199 addSection(X, ns, filesStr, "file");
200 }
201
202 /**
203 * Add archive section in X.
204 *
205 * @param parent XML element to be appended
206 * @param conf Configuration object
207 * @param ns XML element namespace
208 */
209 static void addArchiveSection(Element X, Configuration conf, Namespace ns) {
210 String archivesStr = conf.get(XOozieClient.ARCHIVES);
211 addSection(X, ns, archivesStr, "archive");
212 }
213 }