This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.InputStreamReader;
022 import java.io.Reader;
023 import java.io.StringReader;
024 import java.io.StringWriter;
025 import java.net.URI;
026 import java.net.URISyntaxException;
027 import java.util.ArrayList;
028 import java.util.Date;
029 import java.util.HashMap;
030 import java.util.HashSet;
031 import java.util.List;
032 import java.util.Map;
033 import java.util.Set;
034 import java.util.TreeSet;
035
036 import javax.xml.transform.stream.StreamSource;
037 import javax.xml.validation.Validator;
038
039 import org.apache.hadoop.conf.Configuration;
040 import org.apache.hadoop.fs.FileSystem;
041 import org.apache.hadoop.fs.Path;
042 import org.apache.oozie.CoordinatorJobBean;
043 import org.apache.oozie.ErrorCode;
044 import org.apache.oozie.client.CoordinatorJob;
045 import org.apache.oozie.client.Job;
046 import org.apache.oozie.client.OozieClient;
047 import org.apache.oozie.client.CoordinatorJob.Execution;
048 import org.apache.oozie.command.CommandException;
049 import org.apache.oozie.command.SubmitTransitionXCommand;
050 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
051 import org.apache.oozie.coord.CoordELEvaluator;
052 import org.apache.oozie.coord.CoordELFunctions;
053 import org.apache.oozie.coord.CoordinatorJobException;
054 import org.apache.oozie.coord.TimeUnit;
055 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
056 import org.apache.oozie.executor.jpa.JPAExecutorException;
057 import org.apache.oozie.service.DagXLogInfoService;
058 import org.apache.oozie.service.HadoopAccessorException;
059 import org.apache.oozie.service.HadoopAccessorService;
060 import org.apache.oozie.service.JPAService;
061 import org.apache.oozie.service.SchemaService;
062 import org.apache.oozie.service.Service;
063 import org.apache.oozie.service.Services;
064 import org.apache.oozie.service.UUIDService;
065 import org.apache.oozie.service.WorkflowAppService;
066 import org.apache.oozie.service.SchemaService.SchemaName;
067 import org.apache.oozie.service.UUIDService.ApplicationType;
068 import org.apache.oozie.util.DateUtils;
069 import org.apache.oozie.util.ELEvaluator;
070 import org.apache.oozie.util.IOUtils;
071 import org.apache.oozie.util.InstrumentUtils;
072 import org.apache.oozie.util.LogUtils;
073 import org.apache.oozie.util.ParamChecker;
074 import org.apache.oozie.util.PropertiesUtils;
075 import org.apache.oozie.util.XConfiguration;
076 import org.apache.oozie.util.XLog;
077 import org.apache.oozie.util.XmlUtils;
078 import org.jdom.Attribute;
079 import org.jdom.Element;
080 import org.jdom.JDOMException;
081 import org.jdom.Namespace;
082 import org.xml.sax.SAXException;
083
084 /**
085 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
086 * table.
087 * <p/>
088 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job
089 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML
090 * at runtime.
091 */
092 public class CoordSubmitXCommand extends SubmitTransitionXCommand {
093
094 private Configuration conf;
095 private final String authToken;
096 private final String bundleId;
097 private final String coordName;
098 private boolean dryrun;
099 private JPAService jpaService = null;
100 private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
101
102 public static final String CONFIG_DEFAULT = "coord-config-default.xml";
103 public static final String COORDINATOR_XML_FILE = "coordinator.xml";
104
105 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
106 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
107
108 private CoordinatorJobBean coordJob = null;
109 /**
110 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
111 */
112 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
113
114 public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency";
115
116 public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle";
117
118 public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX
119 + "coord.materialization.throttling.factor";
120
121 /**
122 * Default MAX timeout in minutes, after which coordinator input check will timeout
123 */
124 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
125
126
127 public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
128
129 private ELEvaluator evalFreq = null;
130 private ELEvaluator evalNofuncs = null;
131 private ELEvaluator evalData = null;
132 private ELEvaluator evalInst = null;
133 private ELEvaluator evalSla = null;
134
135 static {
136 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
137 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
138 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
139 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
140 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
141 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
142
143 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
144 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME };
145 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
146 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
147 }
148
149 /**
150 * Constructor to create the Coordinator Submit Command.
151 *
152 * @param conf : Configuration for Coordinator job
153 * @param authToken : To be used for authentication
154 */
155 public CoordSubmitXCommand(Configuration conf, String authToken) {
156 super("coord_submit", "coord_submit", 1);
157 this.conf = ParamChecker.notNull(conf, "conf");
158 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
159 this.bundleId = null;
160 this.coordName = null;
161 }
162
163 /**
164 * Constructor to create the Coordinator Submit Command by bundle job.
165 *
166 * @param conf : Configuration for Coordinator job
167 * @param authToken : To be used for authentication
168 * @param bundleId : bundle id
169 * @param coordName : coord name
170 */
171 public CoordSubmitXCommand(Configuration conf, String authToken, String bundleId, String coordName) {
172 super("coord_submit", "coord_submit", 1);
173 this.conf = ParamChecker.notNull(conf, "conf");
174 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
175 this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");
176 this.coordName = ParamChecker.notEmpty(coordName, "coordName");
177 }
178
179 /**
180 * Constructor to create the Coordinator Submit Command.
181 *
182 * @param dryrun : if dryrun
183 * @param conf : Configuration for Coordinator job
184 * @param authToken : To be used for authentication
185 */
186 public CoordSubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
187 this(conf, authToken);
188 this.dryrun = dryrun;
189 }
190
191 /* (non-Javadoc)
192 * @see org.apache.oozie.command.XCommand#execute()
193 */
194 @Override
195 protected String submit() throws CommandException {
196 String jobId = null;
197 LOG.info("STARTED Coordinator Submit");
198 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
199
200 boolean exceptionOccured = false;
201 try {
202 mergeDefaultConfig();
203
204 String appXml = readAndValidateXml();
205 coordJob.setOrigJobXml(appXml);
206 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
207
208 String appNamespace = readAppNamespace(appXml);
209 coordJob.setAppNamespace(appNamespace);
210
211 appXml = XmlUtils.removeComments(appXml);
212 initEvaluators();
213 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
214 LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
215
216 jobId = storeToDB(eJob, coordJob);
217 // log job info for coordinator job
218 LogUtils.setLogInfo(coordJob, logInfo);
219 LOG = XLog.resetPrefix(LOG);
220
221 if (!dryrun) {
222 // submit a command to materialize jobs for the next 1 hour (3600 secs)
223 // so we don't wait 10 mins for the Service to run.
224 queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100);
225 }
226 else {
227 Date startTime = coordJob.getStartTime();
228 long startTimeMilli = startTime.getTime();
229 long endTimeMilli = startTimeMilli + (3600 * 1000);
230 Date jobEndTime = coordJob.getEndTime();
231 Date endTime = new Date(endTimeMilli);
232 if (endTime.compareTo(jobEndTime) > 0) {
233 endTime = jobEndTime;
234 }
235 jobId = coordJob.getId();
236 LOG.info("[" + jobId + "]: Update status to RUNNING");
237 coordJob.setStatus(Job.Status.RUNNING);
238 coordJob.setPending();
239 CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
240 endTime);
241 Configuration jobConf = null;
242 try {
243 jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
244 }
245 catch (IOException e1) {
246 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
247 }
248 String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
249 String output = coordJob.getJobXml() + System.getProperty("line.separator")
250 + "***actions for instance***" + action;
251 return output;
252 }
253 }
254 catch (CoordinatorJobException cex) {
255 exceptionOccured = true;
256 LOG.warn("ERROR: ", cex);
257 throw new CommandException(cex);
258 }
259 catch (IllegalArgumentException iex) {
260 exceptionOccured = true;
261 LOG.warn("ERROR: ", iex);
262 throw new CommandException(ErrorCode.E1003, iex);
263 }
264 catch (Exception ex) {
265 exceptionOccured = true;
266 LOG.warn("ERROR: ", ex);
267 throw new CommandException(ErrorCode.E0803, ex);
268 }
269 finally {
270 if (exceptionOccured) {
271 if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){
272 coordJob.setStatus(CoordinatorJob.Status.FAILED);
273 coordJob.resetPending();
274 }
275 }
276 }
277
278 LOG.info("ENDED Coordinator Submit jobId=" + jobId);
279 return jobId;
280 }
281
282 /**
283 * Read the application XML and validate against coordinator Schema
284 *
285 * @return validated coordinator XML
286 * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml
287 */
288 private String readAndValidateXml() throws CoordinatorJobException {
289 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
290 OozieClient.COORDINATOR_APP_PATH);
291 String coordXml = readDefinition(appPath);
292 validateXml(coordXml);
293 return coordXml;
294 }
295
296 /**
297 * Validate against Coordinator XSD file
298 *
299 * @param xmlContent : Input coordinator xml
300 * @throws CoordinatorJobException thrown if unable to validate coordinator xml
301 */
302 private void validateXml(String xmlContent) throws CoordinatorJobException {
303 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
304 Validator validator = schema.newValidator();
305 try {
306 validator.validate(new StreamSource(new StringReader(xmlContent)));
307 }
308 catch (SAXException ex) {
309 LOG.warn("SAXException :", ex);
310 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
311 }
312 catch (IOException ex) {
313 LOG.warn("IOException :", ex);
314 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
315 }
316 }
317
318 /**
319 * Read the application XML schema namespace
320 *
321 * @param xmlContent input coordinator xml
322 * @return app xml namespace
323 * @throws CoordinatorJobException
324 */
325 private String readAppNamespace(String xmlContent) throws CoordinatorJobException {
326 try {
327 Element coordXmlElement = XmlUtils.parseXml(xmlContent);
328 Namespace ns = coordXmlElement.getNamespace();
329 if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
330 throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
331 + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
332 }
333 if (ns != null) {
334 return ns.getURI();
335 }
336 else {
337 throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
338 }
339 }
340 catch (JDOMException ex) {
341 LOG.warn("JDOMException :", ex);
342 throw new CoordinatorJobException(ErrorCode.E0700, ex.getMessage(), ex);
343 }
344 }
345
346 /**
347 * Merge default configuration with user-defined configuration.
348 *
349 * @throws CommandException thrown if failed to read or merge configurations
350 */
351 protected void mergeDefaultConfig() throws CommandException {
352 Path configDefault = null;
353 try {
354 String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
355 Path coordAppPath = new Path(coordAppPathStr);
356 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
357 String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
358 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group,
359 coordAppPath.toUri(), new Configuration());
360
361 // app path could be a directory
362 if (!fs.isFile(coordAppPath)) {
363 configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
364 } else {
365 configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
366 }
367
368 if (fs.exists(configDefault)) {
369 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
370 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
371 XConfiguration.injectDefaults(defaultConf, conf);
372 }
373 else {
374 LOG.info("configDefault Doesn't exist " + configDefault);
375 }
376 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
377
378 // Resolving all variables in the job properties.
379 // This ensures the Hadoop Configuration semantics is preserved.
380 XConfiguration resolvedVarsConf = new XConfiguration();
381 for (Map.Entry<String, String> entry : conf) {
382 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
383 }
384 conf = resolvedVarsConf;
385 }
386 catch (IOException e) {
387 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
388 + configDefault, e);
389 }
390 catch (HadoopAccessorException e) {
391 throw new CommandException(e);
392 }
393 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
394 }
395
396 /**
397 * The method resolve all the variables that are defined in configuration. It also include the data set definition
398 * from dataset file into XML.
399 *
400 * @param appXml : Original job XML
401 * @param conf : Configuration of the job
402 * @param coordJob : Coordinator job bean to be populated.
403 * @return Resolved and modified job XML element.
404 * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets
405 * @throws Exception thrown if failed to resolve basic entities or include referred datasets
406 */
407 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
408 throws CoordinatorJobException, Exception {
409 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
410 includeDataSets(basicResolvedApp, conf);
411 return basicResolvedApp;
412 }
413
414 /**
415 * Insert data set into data-in and data-out tags.
416 *
417 * @param eAppXml : coordinator application XML
418 * @param eDatasets : DataSet XML
419 */
420 @SuppressWarnings("unchecked")
421 private void insertDataSet(Element eAppXml, Element eDatasets) {
422 // Adding DS definition in the coordinator XML
423 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
424 if (inputList != null) {
425 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
426 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
427 dataIn.getContent().add(0, eDataset);
428 }
429 }
430 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
431 if (outputList != null) {
432 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
433 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
434 dataOut.getContent().add(0, eDataset);
435 }
436 }
437 }
438
439 /**
440 * Find a specific dataset from a list of Datasets.
441 *
442 * @param eDatasets : List of data sets
443 * @param name : queried data set name
444 * @return one Dataset element. otherwise throw Exception
445 */
446 @SuppressWarnings("unchecked")
447 private static Element findDataSet(Element eDatasets, String name) {
448 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
449 if (eDataset.getAttributeValue("name").equals(name)) {
450 eDataset = (Element) eDataset.clone();
451 eDataset.detach();
452 return eDataset;
453 }
454 }
455 throw new RuntimeException("undefined dataset: " + name);
456 }
457
458 /**
459 * Initialize all the required EL Evaluators.
460 */
461 protected void initEvaluators() {
462 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
463 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
464 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
465 evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit");
466 }
467
468 /**
469 * Resolve basic entities using job Configuration.
470 *
471 * @param conf :Job configuration
472 * @param appXml : Original job XML
473 * @param coordJob : Coordinator job bean to be populated.
474 * @return Resolved job XML element.
475 * @throws CoordinatorJobException thrown if failed to resolve basic entities
476 * @throws Exception thrown if failed to resolve basic entities
477 */
478 @SuppressWarnings("unchecked")
479 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
480 throws CoordinatorJobException, Exception {
481 Element eAppXml = XmlUtils.parseXml(appXml);
482 // job's main attributes
483 // frequency
484 String val = resolveAttribute("frequency", eAppXml, evalFreq);
485 int ival = ParamChecker.checkInteger(val, "frequency");
486 ParamChecker.checkGTZero(ival, "frequency");
487 coordJob.setFrequency(ival);
488 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
489 .getVariable("timeunit"));
490 addAnAttribute("freq_timeunit", eAppXml, tmp.toString());
491 // TimeUnit
492 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
493 // End Of Duration
494 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
495 .getVariable("endOfDuration"));
496 addAnAttribute("end_of_duration", eAppXml, tmp.toString());
497 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
498
499 // Application name
500 if (this.coordName == null) {
501 val = resolveAttribute("name", eAppXml, evalNofuncs);
502 coordJob.setAppName(val);
503 }
504 else {
505 // this coord job is created from bundle
506 coordJob.setAppName(this.coordName);
507 }
508
509 // start time
510 val = resolveAttribute("start", eAppXml, evalNofuncs);
511 ParamChecker.checkUTC(val, "start");
512 coordJob.setStartTime(DateUtils.parseDateUTC(val));
513 // end time
514 val = resolveAttribute("end", eAppXml, evalNofuncs);
515 ParamChecker.checkUTC(val, "end");
516 coordJob.setEndTime(DateUtils.parseDateUTC(val));
517 // Time zone
518 val = resolveAttribute("timezone", eAppXml, evalNofuncs);
519 ParamChecker.checkTimeZone(val, "timezone");
520 coordJob.setTimeZone(val);
521
522 // controls
523 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
524 if (val == "") {
525 val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL);
526 }
527
528 ival = ParamChecker.checkInteger(val, "timeout");
529 if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) {
530 ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600);
531 }
532 coordJob.setTimeout(ival);
533
534 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
535 if (val == null || val.isEmpty()) {
536 val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "-1");
537 }
538 ival = ParamChecker.checkInteger(val, "concurrency");
539 coordJob.setConcurrency(ival);
540
541 val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
542 if (val == null || val.isEmpty()) {
543 int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12);
544 ival = defaultThrottle;
545 }
546 else {
547 ival = ParamChecker.checkInteger(val, "throttle");
548 }
549 int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000);
550 float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f);
551 int maxThrottle = (int) (maxQueue * factor);
552 if (ival > maxThrottle || ival < 1) {
553 ival = maxThrottle;
554 }
555 LOG.debug("max throttle " + ival);
556 coordJob.setMatThrottling(ival);
557
558 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
559 if (val == "") {
560 val = Execution.FIFO.toString();
561 }
562 coordJob.setExecution(Execution.valueOf(val));
563 String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() };
564 ParamChecker.isMember(val, acceptedVals, "execution");
565
566 // datasets
567 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
568 // for each data set
569 resolveDataSets(eAppXml);
570 HashMap<String, String> dataNameList = new HashMap<String, String>();
571 resolveIOEvents(eAppXml, dataNameList);
572
573 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
574 eAppXml.getNamespace()), evalNofuncs);
575 // TODO: If action or workflow tag is missing, NullPointerException will
576 // occur
577 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
578 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
579 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
580 if (configElem != null) {
581 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
582 resolveTagContents("name", propElem, evalData);
583 // Want to check the data-integrity but don't want to modify the
584 // XML
585 // for properties only
586 Element tmpProp = (Element) propElem.clone();
587 resolveTagContents("value", tmpProp, evalData);
588 }
589 }
590 resolveSLA(eAppXml, coordJob);
591 return eAppXml;
592 }
593
594 /**
595 * Resolve SLA events
596 *
597 * @param eAppXml job XML
598 * @param coordJob coordinator job bean
599 * @throws CommandException thrown if failed to resolve sla events
600 */
601 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
602 Element eSla = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("info",
603 Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
604
605 if (eSla != null) {
606 String slaXml = XmlUtils.prettyPrint(eSla).toString();
607 try {
608 // EL evaluation
609 slaXml = evalSla.evaluate(slaXml, String.class);
610 // Validate against semantic SXD
611 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
612 }
613 catch (Exception e) {
614 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
615 }
616 }
617 }
618
619 /**
620 * Resolve input-events/data-in and output-events/data-out tags.
621 *
622 * @param eJob : Job element
623 * @throws CoordinatorJobException thrown if failed to resolve input and output events
624 */
625 @SuppressWarnings("unchecked")
626 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
627 // Resolving input-events/data-in
628 // Clone the job and don't update anything in the original
629 Element eJob = (Element) eJobOrg.clone();
630 Element inputList = eJob.getChild("input-events", eJob.getNamespace());
631 if (inputList != null) {
632 TreeSet<String> eventNameSet = new TreeSet<String>();
633 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
634 String dataInName = dataIn.getAttributeValue("name");
635 dataNameList.put(dataInName, "data-in");
636 // check whether there is any duplicate data-in name
637 if (eventNameSet.contains(dataInName)) {
638 throw new RuntimeException("Duplicate dataIn name " + dataInName);
639 }
640 else {
641 eventNameSet.add(dataInName);
642 }
643 resolveTagContents("instance", dataIn, evalInst);
644 resolveTagContents("start-instance", dataIn, evalInst);
645 resolveTagContents("end-instance", dataIn, evalInst);
646 }
647 }
648 // Resolving output-events/data-out
649 Element outputList = eJob.getChild("output-events", eJob.getNamespace());
650 if (outputList != null) {
651 TreeSet<String> eventNameSet = new TreeSet<String>();
652 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
653 String dataOutName = dataOut.getAttributeValue("name");
654 dataNameList.put(dataOutName, "data-out");
655 // check whether there is any duplicate data-out name
656 if (eventNameSet.contains(dataOutName)) {
657 throw new RuntimeException("Duplicate dataIn name " + dataOutName);
658 }
659 else {
660 eventNameSet.add(dataOutName);
661 }
662 resolveTagContents("instance", dataOut, evalInst);
663 }
664 }
665
666 }
667
668 /**
669 * Add an attribute into XML element.
670 *
671 * @param attrName :attribute name
672 * @param elem : Element to add attribute
673 * @param value :Value of attribute
674 */
675 private void addAnAttribute(String attrName, Element elem, String value) {
676 elem.setAttribute(attrName, value);
677 }
678
679 /**
680 * Resolve datasets using job configuration.
681 *
682 * @param eAppXml : Job Element XML
683 * @throws Exception thrown if failed to resolve datasets
684 */
685 @SuppressWarnings("unchecked")
686 private void resolveDataSets(Element eAppXml) throws Exception {
687 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
688 if (datasetList != null) {
689
690 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
691 resolveDataSets(dsElems);
692 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
693 eAppXml.getNamespace()), evalNofuncs);
694 }
695 }
696
697 /**
698 * Resolve datasets using job configuration.
699 *
700 * @param dsElems : Data set XML element.
701 * @throws CoordinatorJobException thrown if failed to resolve datasets
702 */
703 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException {
704 for (Element dsElem : dsElems) {
705 // Setting up default TimeUnit and EndOFDuraion
706 evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
707 evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
708
709 String val = resolveAttribute("frequency", dsElem, evalFreq);
710 int ival = ParamChecker.checkInteger(val, "frequency");
711 ParamChecker.checkGTZero(ival, "frequency");
712 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
713 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
714 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
715 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
716 val = resolveAttribute("initial-instance", dsElem, evalNofuncs);
717 ParamChecker.checkUTC(val, "initial-instance");
718 val = resolveAttribute("timezone", dsElem, evalNofuncs);
719 ParamChecker.checkTimeZone(val, "timezone");
720 resolveTagContents("uri-template", dsElem, evalNofuncs);
721 resolveTagContents("done-flag", dsElem, evalNofuncs);
722 }
723 }
724
725 /**
726 * Resolve the content of a tag.
727 *
728 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
729 * @param elem : Element where the tag exists.
730 * @param eval : EL evealuator
731 * @return Resolved tag content.
732 * @throws CoordinatorJobException thrown if failed to resolve tag content
733 */
734 @SuppressWarnings("unchecked")
735 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
736 String ret = "";
737 if (elem != null) {
738 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
739 if (tagElem != null) {
740 String updated;
741 try {
742 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
743
744 }
745 catch (Exception e) {
746 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
747 }
748 tagElem.removeContent();
749 tagElem.addContent(updated);
750 ret += updated;
751 }
752 }
753 }
754 return ret;
755 }
756
757 /**
758 * Resolve an attribute value.
759 *
760 * @param attrName : Attribute name.
761 * @param elem : XML Element where attribute is defiend
762 * @param eval : ELEvaluator used to resolve
763 * @return Resolved attribute value
764 * @throws CoordinatorJobException thrown if failed to resolve an attribute value
765 */
766 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
767 Attribute attr = elem.getAttribute(attrName);
768 String val = null;
769 if (attr != null) {
770 try {
771 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
772 }
773 catch (Exception e) {
774 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
775 }
776 attr.setValue(val);
777 }
778 return val;
779 }
780
781 /**
782 * Include referred datasets into XML.
783 *
784 * @param resolvedXml : Job XML element.
785 * @param conf : Job configuration
786 * @throws CoordinatorJobException thrown if failed to include referred datasets into XML
787 */
788 @SuppressWarnings("unchecked")
789 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException {
790 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
791 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
792 List<String> dsList = new ArrayList<String>();
793 if (datasets != null) {
794 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
795 String incDSFile = includeElem.getTextTrim();
796 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
797 }
798 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
799 String dsName = e.getAttributeValue("name");
800 if (dsList.contains(dsName)) {// Override with this DS
801 // Remove old DS
802 removeDataSet(allDataSets, dsName);
803 }
804 else {
805 dsList.add(dsName);
806 }
807 allDataSets.addContent((Element) e.clone());
808 }
809 }
810 insertDataSet(resolvedXml, allDataSets);
811 resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
812 }
813
814 /**
815 * Include one dataset file.
816 *
817 * @param incDSFile : Include data set filename.
818 * @param dsList :List of dataset names to verify the duplicate.
819 * @param allDataSets : Element that includes all dataset definitions.
820 * @param dsNameSpace : Data set name space
821 * @throws CoordinatorJobException thrown if failed to include one dataset file
822 */
823 @SuppressWarnings("unchecked")
824 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
825 throws CoordinatorJobException {
826 Element tmpDataSets = null;
827 try {
828 String dsXml = readDefinition(incDSFile);
829 LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml);
830 tmpDataSets = XmlUtils.parseXml(dsXml);
831 }
832 catch (JDOMException e) {
833 LOG.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage());
834 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
835 }
836 resolveDataSets(tmpDataSets.getChildren("dataset"));
837 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
838 String dsName = e.getAttributeValue("name");
839 if (dsList.contains(dsName)) {
840 throw new RuntimeException("Duplicate Dataset " + dsName);
841 }
842 dsList.add(dsName);
843 Element tmp = (Element) e.clone();
844 // TODO: Don't like to over-write the external/include DS's namespace
845 tmp.setNamespace(dsNameSpace);
846 tmp.getChild("uri-template").setNamespace(dsNameSpace);
847 if (e.getChild("done-flag") != null) {
848 tmp.getChild("done-flag").setNamespace(dsNameSpace);
849 }
850 allDataSets.addContent(tmp);
851 }
852 // nested include
853 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
854 String incFile = includeElem.getTextTrim();
855 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
856 }
857 }
858
859 /**
860 * Remove a dataset from a list of dataset.
861 *
862 * @param eDatasets : List of dataset
863 * @param name : Dataset name to be removed.
864 */
865 @SuppressWarnings("unchecked")
866 private static void removeDataSet(Element eDatasets, String name) {
867 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
868 if (eDataset.getAttributeValue("name").equals(name)) {
869 eDataset.detach();
870 }
871 }
872 throw new RuntimeException("undefined dataset: " + name);
873 }
874
875 /**
876 * Read coordinator definition.
877 *
878 * @param appPath application path.
879 * @return coordinator definition.
880 * @throws CoordinatorJobException thrown if the definition could not be read.
881 */
882 protected String readDefinition(String appPath) throws CoordinatorJobException {
883 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
884 String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
885 // Configuration confHadoop = CoordUtils.getHadoopConf(conf);
886 try {
887 URI uri = new URI(appPath);
888 LOG.debug("user =" + user + " group =" + group);
889 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri,
890 new Configuration());
891 Path appDefPath = null;
892
893 // app path could be a directory
894 Path path = new Path(uri.getPath());
895 // check file exists for dataset include file, app xml already checked
896 if (!fs.exists(path)) {
897 throw new URISyntaxException(path.toString(), "path not existed : " + path.toString());
898 }
899 if (!fs.isFile(path)) {
900 appDefPath = new Path(path, COORDINATOR_XML_FILE);
901 } else {
902 appDefPath = path;
903 }
904
905 Reader reader = new InputStreamReader(fs.open(appDefPath));
906 StringWriter writer = new StringWriter();
907 IOUtils.copyCharStream(reader, writer);
908 return writer.toString();
909 }
910 catch (IOException ex) {
911 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
912 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
913 }
914 catch (URISyntaxException ex) {
915 LOG.warn("URISyException :" + ex.getMessage());
916 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);
917 }
918 catch (HadoopAccessorException ex) {
919 throw new CoordinatorJobException(ex);
920 }
921 catch (Exception ex) {
922 LOG.warn("Exception :", ex);
923 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
924 }
925 }
926
927 /**
928 * Write a coordinator job into database
929 *
930 * @param eJob : XML element of job
931 * @param coordJob : Coordinator job bean
932 * @return Job id
933 * @throws CommandException thrown if unable to save coordinator job to db
934 */
935 private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException {
936 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
937 coordJob.setId(jobId);
938 coordJob.setAuthToken(this.authToken);
939
940 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
941 coordJob.setCreatedTime(new Date());
942 coordJob.setUser(conf.get(OozieClient.USER_NAME));
943 coordJob.setGroup(conf.get(OozieClient.GROUP_NAME));
944 coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
945 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
946 coordJob.setLastActionNumber(0);
947 coordJob.setLastModifiedTime(new Date());
948
949 if (!dryrun) {
950 coordJob.setLastModifiedTime(new Date());
951 try {
952 jpaService.execute(new CoordJobInsertJPAExecutor(coordJob));
953 }
954 catch (JPAExecutorException je) {
955 throw new CommandException(je);
956 }
957 }
958 return jobId;
959 }
960
961 /* (non-Javadoc)
962 * @see org.apache.oozie.command.XCommand#getEntityKey()
963 */
964 @Override
965 protected String getEntityKey() {
966 return null;
967 }
968
969 /* (non-Javadoc)
970 * @see org.apache.oozie.command.XCommand#isLockRequired()
971 */
972 @Override
973 protected boolean isLockRequired() {
974 return false;
975 }
976
977 /* (non-Javadoc)
978 * @see org.apache.oozie.command.XCommand#loadState()
979 */
980 @Override
981 protected void loadState() throws CommandException {
982 jpaService = Services.get().get(JPAService.class);
983 if (jpaService == null) {
984 throw new CommandException(ErrorCode.E0610);
985 }
986 coordJob = new CoordinatorJobBean();
987 if (this.bundleId != null) {
988 // this coord job is created from bundle
989 coordJob.setBundleId(this.bundleId);
990 // first use bundle id if submit thru bundle
991 logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId);
992 LogUtils.setLogInfo(logInfo);
993 }
994 if (this.coordName != null) {
995 // this coord job is created from bundle
996 coordJob.setAppName(this.coordName);
997 }
998 setJob(coordJob);
999
1000 }
1001
1002 /* (non-Javadoc)
1003 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
1004 */
1005 @Override
1006 protected void verifyPrecondition() throws CommandException {
1007
1008 }
1009
1010 /* (non-Javadoc)
1011 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
1012 */
1013 @Override
1014 public void notifyParent() throws CommandException {
1015 // update bundle action
1016 if (coordJob.getBundleId() != null) {
1017 LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId());
1018 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
1019 bundleStatusUpdate.call();
1020 }
1021 }
1022
1023 /* (non-Javadoc)
1024 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
1025 */
1026 @Override
1027 public void updateJob() throws CommandException {
1028 }
1029
1030 /* (non-Javadoc)
1031 * @see org.apache.oozie.command.TransitionXCommand#getJob()
1032 */
1033 @Override
1034 public Job getJob() {
1035 return coordJob;
1036 }
1037 }