This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.WorkflowJobBean;
022 import org.apache.oozie.ErrorCode;
023 import org.apache.oozie.service.JPAService;
024 import org.apache.oozie.service.WorkflowStoreService;
025 import org.apache.oozie.service.WorkflowAppService;
026 import org.apache.oozie.service.Services;
027 import org.apache.oozie.service.DagXLogInfoService;
028 import org.apache.oozie.util.InstrumentUtils;
029 import org.apache.oozie.util.LogUtils;
030 import org.apache.oozie.util.XLog;
031 import org.apache.oozie.util.ParamChecker;
032 import org.apache.oozie.util.XConfiguration;
033 import org.apache.oozie.util.XmlUtils;
034 import org.apache.oozie.command.CommandException;
035 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
036 import org.apache.oozie.store.StoreException;
037 import org.apache.oozie.workflow.WorkflowApp;
038 import org.apache.oozie.workflow.WorkflowException;
039 import org.apache.oozie.workflow.WorkflowInstance;
040 import org.apache.oozie.workflow.WorkflowLib;
041 import org.apache.oozie.util.PropertiesUtils;
042 import org.apache.oozie.client.OozieClient;
043 import org.apache.oozie.client.WorkflowJob;
044 import org.apache.oozie.client.XOozieClient;
045 import org.jdom.Element;
046 import org.jdom.Namespace;
047
048 import java.util.Date;
049 import java.util.Map;
050 import java.util.Set;
051 import java.util.HashSet;
052
053 public abstract class SubmitHttpXCommand extends WorkflowXCommand<String> {
054
055 protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>();
056 protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>();
057
058 static {
059 MANDATORY_OOZIE_CONFS.add(XOozieClient.JT);
060 MANDATORY_OOZIE_CONFS.add(XOozieClient.NN);
061 MANDATORY_OOZIE_CONFS.add(OozieClient.LIBPATH);
062
063 OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES);
064 OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES);
065 }
066
067 private Configuration conf;
068 private String authToken;
069
070 public SubmitHttpXCommand(String name, String type, Configuration conf, String authToken) {
071 super(name, type, 1);
072 this.conf = ParamChecker.notNull(conf, "conf");
073 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
074 }
075
076 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
077 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
078
079 static {
080 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
081 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
082 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
083 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
084 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
085
086 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
087 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME };
088 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
089 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
090 }
091
092 /**
093 * Generate workflow xml from conf object
094 *
095 * @param conf the configuration object
096 * @return workflow xml def string representation
097 */
098 abstract protected String getWorkflowXml(Configuration conf);
099
100 /* (non-Javadoc)
101 * @see org.apache.oozie.command.XCommand#execute()
102 */
103 @Override
104 protected String execute() throws CommandException {
105 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
106 WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
107 try {
108 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
109 String wfXml = getWorkflowXml(conf);
110 LOG.debug("workflow xml created on the server side is :\n");
111 LOG.debug(wfXml);
112 WorkflowApp app = wps.parseDef(wfXml);
113 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, false);
114 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
115
116 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
117
118 // Resolving all variables in the job properties.
119 // This ensures the Hadoop Configuration semantics is preserved.
120 XConfiguration resolvedVarsConf = new XConfiguration();
121 for (Map.Entry<String, String> entry : conf) {
122 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
123 }
124 conf = resolvedVarsConf;
125
126 WorkflowInstance wfInstance;
127 try {
128 wfInstance = workflowLib.createInstance(app, conf);
129 }
130 catch (WorkflowException e) {
131 throw new StoreException(e);
132 }
133
134 Configuration conf = wfInstance.getConf();
135
136 WorkflowJobBean workflow = new WorkflowJobBean();
137 workflow.setId(wfInstance.getId());
138 workflow.setAppName(app.getName());
139 workflow.setAppPath(conf.get(OozieClient.APP_PATH));
140 workflow.setConf(XmlUtils.prettyPrint(conf).toString());
141 workflow.setProtoActionConf(protoActionConf.toXmlString());
142 workflow.setCreatedTime(new Date());
143 workflow.setLastModifiedTime(new Date());
144 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
145 workflow.setStatus(WorkflowJob.Status.PREP);
146 workflow.setRun(0);
147 workflow.setUser(conf.get(OozieClient.USER_NAME));
148 workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
149 workflow.setAuthToken(authToken);
150 workflow.setWorkflowInstance(wfInstance);
151 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
152
153 LogUtils.setLogInfo(workflow, logInfo);
154 JPAService jpaService = Services.get().get(JPAService.class);
155 if (jpaService != null) {
156 jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow));
157 }
158 else {
159 LOG.error(ErrorCode.E0610);
160 return null;
161 }
162
163 return workflow.getId();
164 }
165 catch (WorkflowException ex) {
166 throw new CommandException(ex);
167 }
168 catch (Exception ex) {
169 throw new CommandException(ErrorCode.E0803, ex);
170 }
171 }
172
173 static private void addSection(Element X, Namespace ns, String filesStr, String tagName) {
174 if (filesStr != null) {
175 String[] files = filesStr.split(",");
176 for (String f : files) {
177 Element tagElement = new Element(tagName, ns);
178 if (f.contains("#")) {
179 tagElement.addContent(f);
180 }
181 else {
182 String filename = f.substring(f.lastIndexOf("/") + 1, f.length());
183 if (filename == null || filename.isEmpty()) {
184 tagElement.addContent(f);
185 }
186 else {
187 tagElement.addContent(f + "#" + filename);
188 }
189 }
190 X.addContent(tagElement);
191 }
192 }
193 }
194
195 /**
196 * Add file section in X.
197 *
198 * @param parent XML element to be appended
199 * @param conf Configuration object
200 * @param ns XML element namespace
201 */
202 static void addFileSection(Element X, Configuration conf, Namespace ns) {
203 String filesStr = conf.get(XOozieClient.FILES);
204 addSection(X, ns, filesStr, "file");
205 }
206
207 /**
208 * Add archive section in X.
209 *
210 * @param parent XML element to be appended
211 * @param conf Configuration object
212 * @param ns XML element namespace
213 */
214 static void addArchiveSection(Element X, Configuration conf, Namespace ns) {
215 String archivesStr = conf.get(XOozieClient.ARCHIVES);
216 addSection(X, ns, archivesStr, "archive");
217 }
218 }