001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.List;
023    
024    import javax.xml.XMLConstants;
025    import javax.xml.transform.stream.StreamSource;
026    import javax.xml.validation.Schema;
027    import javax.xml.validation.SchemaFactory;
028    
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.oozie.ErrorCode;
031    import org.apache.oozie.util.IOUtils;
032    import org.xml.sax.SAXException;
033    
034    /**
035     * Service that loads Oozie workflow definition schema and registered extension
036     * schemas.
037     */
038    public class SchemaService implements Service {
039    
040        public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchemaService.";
041    
042        public static final String WF_CONF_EXT_SCHEMAS = CONF_PREFIX + "wf.ext.schemas";
043    
044        public static final String COORD_CONF_EXT_SCHEMAS = CONF_PREFIX + "coord.ext.schemas";
045    
046        public static final String BUNDLE_CONF_EXT_SCHEMAS = CONF_PREFIX + "bundle.ext.schemas";
047    
048        public static final String SLA_CONF_EXT_SCHEMAS = CONF_PREFIX + "sla.ext.schemas";
049    
050        public static final String SLA_NAME_SPACE_URI = "uri:oozie:sla:0.1";
051    
052        public static final String COORDINATOR_NAMESPACE_URI_1 = "uri:oozie:coordinator:0.1";
053    
054        private Schema wfSchema;
055    
056        private Schema coordSchema;
057    
058        private Schema bundleSchema;
059    
060        private Schema slaSchema;
061    
062        private static final String OOZIE_WORKFLOW_XSD[] = { 
063            "oozie-workflow-0.1.xsd", 
064            "oozie-workflow-0.2.xsd",
065            "oozie-workflow-0.2.5.xsd",
066            "oozie-workflow-0.3.xsd",
067            "oozie-workflow-0.4.xsd"};
068        private static final String OOZIE_COORDINATOR_XSD[] = { "oozie-coordinator-0.1.xsd", "oozie-coordinator-0.2.xsd", 
069            "oozie-coordinator-0.3.xsd", "oozie-coordinator-0.4.xsd"};
070        private static final String OOZIE_BUNDLE_XSD[] = { "oozie-bundle-0.1.xsd", "oozie-bundle-0.2.xsd" };
071        private static final String OOZIE_SLA_SEMANTIC_XSD[] = { "gms-oozie-sla-0.1.xsd" };
072    
073        private Schema loadSchema(Configuration conf, String[] baseSchemas, String extSchema) throws SAXException,
074        IOException {
075            List<StreamSource> sources = new ArrayList<StreamSource>();
076            for (String baseSchema : baseSchemas) {
077                sources.add(new StreamSource(IOUtils.getResourceAsStream(baseSchema, -1)));
078            }
079            String[] schemas = conf.getStrings(extSchema);
080            if (schemas != null) {
081                for (String schema : schemas) {
082                    sources.add(new StreamSource(IOUtils.getResourceAsStream(schema, -1)));
083                }
084            }
085            SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
086            return factory.newSchema(sources.toArray(new StreamSource[sources.size()]));
087        }
088    
089        /**
090         * Initialize the service.
091         *
092         * @param services services instance.
093         * @throws ServiceException thrown if the service could not be initialized.
094         */
095        public void init(Services services) throws ServiceException {
096            try {
097                wfSchema = loadSchema(services.getConf(), OOZIE_WORKFLOW_XSD, WF_CONF_EXT_SCHEMAS);
098                coordSchema = loadSchema(services.getConf(), OOZIE_COORDINATOR_XSD, COORD_CONF_EXT_SCHEMAS);
099                bundleSchema = loadSchema(services.getConf(), OOZIE_BUNDLE_XSD, BUNDLE_CONF_EXT_SCHEMAS);
100                slaSchema = loadSchema(services.getConf(), OOZIE_SLA_SEMANTIC_XSD, SLA_CONF_EXT_SCHEMAS);
101                bundleSchema = loadSchema(services.getConf(), OOZIE_BUNDLE_XSD, BUNDLE_CONF_EXT_SCHEMAS);
102            }
103            catch (SAXException ex) {
104                throw new ServiceException(ErrorCode.E0130, ex.getMessage(), ex);
105            }
106            catch (IOException ex) {
107                throw new ServiceException(ErrorCode.E0131, ex.getMessage(), ex);
108            }
109        }
110    
111        /**
112         * Return the public interface of the service.
113         *
114         * @return {@link SchemaService}.
115         */
116        public Class<? extends Service> getInterface() {
117            return SchemaService.class;
118        }
119    
120        /**
121         * Destroy the service.
122         */
123        public void destroy() {
124            wfSchema = null;
125            bundleSchema = null;
126            slaSchema = null;
127            coordSchema = null;
128        }
129    
130        /**
131         * Return the schema for XML validation of application definitions.
132         *
133         * @param schemaName: Name of schema definition (i.e.
134         *        WORKFLOW/COORDINATOR/BUNDLE)
135         * @return the schema for XML validation of application definitions.
136         */
137        public Schema getSchema(SchemaName schemaName) {
138            Schema returnSchema = null;
139            if (schemaName == SchemaName.WORKFLOW) {
140                returnSchema = wfSchema;
141            }
142            else if (schemaName == SchemaName.COORDINATOR) {
143                returnSchema = coordSchema;
144            }
145            else if (schemaName == SchemaName.BUNDLE) {
146                returnSchema = bundleSchema;
147            }
148            else if (schemaName == SchemaName.SLA_ORIGINAL) {
149                returnSchema = slaSchema;
150            }
151            else {
152                throw new RuntimeException("No schema found with name " + schemaName);
153            }
154            return returnSchema;
155        }
156    
157        public enum SchemaName {
158            WORKFLOW(1), COORDINATOR(2), SLA_ORIGINAL(3), BUNDLE(4);
159            private final int id;
160    
161            private SchemaName(int id) {
162                this.id = id;
163            }
164    
165            public int getId() {
166                return id;
167            }
168        }
169    }