001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.util.IOUtils; 022 import org.apache.oozie.ErrorCode; 023 import org.xml.sax.SAXException; 024 025 import javax.xml.XMLConstants; 026 import javax.xml.transform.stream.StreamSource; 027 import javax.xml.validation.Schema; 028 import javax.xml.validation.SchemaFactory; 029 import java.io.IOException; 030 import java.util.ArrayList; 031 import java.util.List; 032 033 /** 034 * Service that loads Oozie workflow definition schema and registered extension schemas. 035 */ 036 public class WorkflowSchemaService implements Service { 037 038 public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowSchemaService."; 039 040 public static final String CONF_EXT_SCHEMAS = CONF_PREFIX + "ext.schemas"; 041 042 private Schema dagSchema; 043 044 private static final String OOZIE_WORKFLOW_XSD = "oozie-workflow-0.1.xsd"; 045 046 private Schema loadSchema(Configuration conf) throws SAXException, IOException { 047 List<StreamSource> sources = new ArrayList<StreamSource>(); 048 sources.add(new StreamSource(IOUtils.getResourceAsStream(OOZIE_WORKFLOW_XSD, -1))); 049 String[] schemas = conf.getStrings(CONF_EXT_SCHEMAS); 050 if (schemas != null) { 051 for (String schema : schemas) { 052 sources.add(new StreamSource(IOUtils.getResourceAsStream(schema, -1))); 053 } 054 } 055 SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); 056 return factory.newSchema(sources.toArray(new StreamSource[sources.size()])); 057 } 058 059 /** 060 * Initialize the service. 061 * 062 * @param services services instance. 063 * @throws ServiceException thrown if the service could not be initialized. 064 */ 065 public void init(Services services) throws ServiceException { 066 try { 067 dagSchema = loadSchema(services.getConf()); 068 } 069 catch (SAXException ex) { 070 throw new ServiceException(ErrorCode.E0130, ex.getMessage(), ex); 071 } 072 catch (IOException ex) { 073 throw new ServiceException(ErrorCode.E0131, ex.getMessage(), ex); 074 } 075 } 076 077 /** 078 * Return the public interface of the service. 079 * 080 * @return {@link WorkflowSchemaService}. 081 */ 082 public Class<? extends Service> getInterface() { 083 return WorkflowSchemaService.class; 084 } 085 086 /** 087 * Destroy the service. 088 */ 089 public void destroy() { 090 dagSchema = null; 091 } 092 093 /** 094 * Return the schema for XML validation of application definitions. 095 * 096 * @return the schema for XML validation of application definitions. 097 */ 098 public Schema getSchema() { 099 return dagSchema; 100 } 101 102 }