001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.Path; 022 import org.apache.hadoop.fs.FileSystem; 023 import org.apache.oozie.WorkflowJobBean; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.service.HadoopAccessorException; 026 import org.apache.oozie.service.JPAService; 027 import org.apache.oozie.service.WorkflowStoreService; 028 import org.apache.oozie.service.WorkflowAppService; 029 import org.apache.oozie.service.HadoopAccessorService; 030 import org.apache.oozie.service.Services; 031 import org.apache.oozie.service.DagXLogInfoService; 032 import org.apache.oozie.util.ConfigUtils; 033 import org.apache.oozie.util.LogUtils; 034 import org.apache.oozie.util.XLog; 035 import org.apache.oozie.util.ParamChecker; 036 import org.apache.oozie.util.XConfiguration; 037 import org.apache.oozie.util.XmlUtils; 038 import org.apache.oozie.command.CommandException; 039 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; 040 import org.apache.oozie.service.ELService; 041 import org.apache.oozie.service.SchemaService; 042 import org.apache.oozie.store.StoreException; 043 import org.apache.oozie.workflow.WorkflowApp; 044 import org.apache.oozie.workflow.WorkflowException; 045 import org.apache.oozie.workflow.WorkflowInstance; 046 import org.apache.oozie.workflow.WorkflowLib; 047 import org.apache.oozie.util.ELEvaluator; 048 import org.apache.oozie.util.InstrumentUtils; 049 import org.apache.oozie.util.PropertiesUtils; 050 import org.apache.oozie.util.db.SLADbOperations; 051 import org.apache.oozie.service.SchemaService.SchemaName; 052 import org.apache.oozie.client.OozieClient; 053 import org.apache.oozie.client.WorkflowJob; 054 import org.apache.oozie.client.SLAEvent.SlaAppType; 055 import org.jdom.Element; 056 import org.jdom.Namespace; 057 058 import java.util.Date; 059 import java.util.List; 060 import java.util.Map; 061 import java.util.Set; 062 import java.util.HashSet; 063 import java.io.IOException; 064 import java.net.URI; 065 066 public class SubmitXCommand extends WorkflowXCommand<String> { 067 public static final String CONFIG_DEFAULT = "config-default.xml"; 068 069 private Configuration conf; 070 private String authToken; 071 072 public SubmitXCommand(Configuration conf, String authToken) { 073 super("submit", "submit", 1); 074 this.conf = ParamChecker.notNull(conf, "conf"); 075 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 076 } 077 078 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 079 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 080 081 static { 082 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 083 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 084 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 085 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS}; 086 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 087 088 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER}; 089 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 090 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 091 } 092 093 @Override 094 protected String execute() throws CommandException { 095 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 096 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 097 try { 098 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 099 WorkflowApp app = wps.parseDef(conf, authToken); 100 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true); 101 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 102 103 String user = conf.get(OozieClient.USER_NAME); 104 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 105 URI uri = new URI(conf.get(OozieClient.APP_PATH)); 106 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 107 Configuration fsConf = has.createJobConf(uri.getAuthority()); 108 FileSystem fs = has.createFileSystem(user, uri, fsConf); 109 110 Path configDefault = null; 111 // app path could be a directory 112 Path path = new Path(uri.getPath()); 113 if (!fs.isFile(path)) { 114 configDefault = new Path(path, CONFIG_DEFAULT); 115 } else { 116 configDefault = new Path(path.getParent(), CONFIG_DEFAULT); 117 } 118 119 if (fs.exists(configDefault)) { 120 try { 121 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 122 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 123 XConfiguration.injectDefaults(defaultConf, conf); 124 } 125 catch (IOException ex) { 126 throw new IOException("default configuration file, " + ex.getMessage(), ex); 127 } 128 } 129 130 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 131 132 // Resolving all variables in the job properties. 133 // This ensures the Hadoop Configuration semantics is preserved. 134 XConfiguration resolvedVarsConf = new XConfiguration(); 135 for (Map.Entry<String, String> entry : conf) { 136 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 137 } 138 conf = resolvedVarsConf; 139 140 WorkflowInstance wfInstance; 141 try { 142 wfInstance = workflowLib.createInstance(app, conf); 143 } 144 catch (WorkflowException e) { 145 throw new StoreException(e); 146 } 147 148 Configuration conf = wfInstance.getConf(); 149 // System.out.println("WF INSTANCE CONF:"); 150 // System.out.println(XmlUtils.prettyPrint(conf).toString()); 151 152 WorkflowJobBean workflow = new WorkflowJobBean(); 153 workflow.setId(wfInstance.getId()); 154 workflow.setAppName(app.getName()); 155 workflow.setAppPath(conf.get(OozieClient.APP_PATH)); 156 workflow.setConf(XmlUtils.prettyPrint(conf).toString()); 157 workflow.setProtoActionConf(protoActionConf.toXmlString()); 158 workflow.setCreatedTime(new Date()); 159 workflow.setLastModifiedTime(new Date()); 160 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 161 workflow.setStatus(WorkflowJob.Status.PREP); 162 workflow.setRun(0); 163 workflow.setUser(conf.get(OozieClient.USER_NAME)); 164 workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); 165 workflow.setAuthToken(authToken); 166 workflow.setWorkflowInstance(wfInstance); 167 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 168 169 LogUtils.setLogInfo(workflow, logInfo); 170 LOG = XLog.resetPrefix(LOG); 171 LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus()); 172 Element wfElem = XmlUtils.parseXml(app.getDefinition()); 173 ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit"); 174 String jobSlaXml = verifySlaElements(wfElem, evalSla); 175 writeSLARegistration(jobSlaXml, workflow.getId(), workflow.getUser(), workflow.getGroup(), LOG); 176 workflow.setSlaXml(jobSlaXml); 177 // System.out.println("SlaXml :"+ slaXml); 178 179 //store.insertWorkflow(workflow); 180 JPAService jpaService = Services.get().get(JPAService.class); 181 if (jpaService != null) { 182 jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow)); 183 } 184 else { 185 LOG.error(ErrorCode.E0610); 186 return null; 187 } 188 189 return workflow.getId(); 190 } 191 catch (WorkflowException ex) { 192 throw new CommandException(ex); 193 } 194 catch (HadoopAccessorException ex) { 195 throw new CommandException(ex); 196 } 197 catch (Exception ex) { 198 throw new CommandException(ErrorCode.E0803, ex); 199 } 200 } 201 202 private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException { 203 String jobSlaXml = ""; 204 // Validate WF job 205 Element eSla = eWfJob.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 206 if (eSla != null) { 207 jobSlaXml = resolveSla(eSla, evalSla); 208 } 209 210 // Validate all actions 211 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 212 eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 213 if (eSla != null) { 214 resolveSla(eSla, evalSla); 215 } 216 } 217 return jobSlaXml; 218 } 219 220 private void writeSLARegistration(String slaXml, String id, String user, String group, XLog log) 221 throws CommandException { 222 try { 223 if (slaXml != null && slaXml.length() > 0) { 224 Element eSla = XmlUtils.parseXml(slaXml); 225 SLADbOperations.writeSlaRegistrationEvent(eSla, id, SlaAppType.WORKFLOW_JOB, user, group, log); 226 } 227 } 228 catch (Exception e) { 229 e.printStackTrace(); 230 throw new CommandException(ErrorCode.E1007, "workflow " + id, e); 231 } 232 } 233 234 /** 235 * Resolve variables in sla xml element. 236 * 237 * @param eSla sla xml element 238 * @param evalSla sla evaluator 239 * @return sla xml string after evaluation 240 * @throws CommandException 241 */ 242 public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException { 243 // EL evaluation 244 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 245 try { 246 slaXml = XmlUtils.removeComments(slaXml); 247 slaXml = evalSla.evaluate(slaXml, String.class); 248 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 249 return slaXml; 250 } 251 catch (Exception e) { 252 throw new CommandException(ErrorCode.E1004, "Validation erro :" + e.getMessage(), e); 253 } 254 } 255 256 /** 257 * Create an EL evaluator for a given group. 258 * 259 * @param conf configuration variable 260 * @param group group variable 261 * @return the evaluator created for the group 262 */ 263 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 264 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 265 for (Map.Entry<String, String> entry : conf) { 266 eval.setVariable(entry.getKey(), entry.getValue()); 267 } 268 return eval; 269 } 270 271 @Override 272 public String getEntityKey() { 273 return null; 274 } 275 276 @Override 277 protected boolean isLockRequired() { 278 return false; 279 } 280 281 @Override 282 protected void loadState() { 283 284 } 285 286 @Override 287 protected void verifyPrecondition() throws CommandException { 288 289 } 290 291 }