001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026
027import javax.xml.XMLConstants;
028import javax.xml.transform.stream.StreamSource;
029import javax.xml.validation.Schema;
030import javax.xml.validation.SchemaFactory;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.oozie.ErrorCode;
034import org.apache.oozie.util.IOUtils;
035import org.xml.sax.SAXException;
036
037/**
038 * Service that loads Oozie workflow definition schema and registered extension
039 * schemas.
040 */
041public class SchemaService implements Service {
042
043    public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchemaService.";
044
045    public static final String WF_CONF_SCHEMAS = CONF_PREFIX + "wf.schemas";
046
047    public static final String WF_CONF_EXT_SCHEMAS = CONF_PREFIX + "wf.ext.schemas";
048
049    public static final String COORD_CONF_SCHEMAS = CONF_PREFIX + "coord.schemas";
050
051    public static final String COORD_CONF_EXT_SCHEMAS = CONF_PREFIX + "coord.ext.schemas";
052
053    public static final String BUNDLE_CONF_SCHEMAS = CONF_PREFIX + "bundle.schemas";
054
055    public static final String BUNDLE_CONF_EXT_SCHEMAS = CONF_PREFIX + "bundle.ext.schemas";
056
057    public static final String SLA_CONF_SCHEMAS = CONF_PREFIX + "sla.schemas";
058
059    public static final String SLA_CONF_EXT_SCHEMAS = CONF_PREFIX + "sla.ext.schemas";
060
061    @Deprecated
062    public static final String SLA_NAME_SPACE_URI = "uri:oozie:sla:0.1";
063
064    public static final String SLA_NAMESPACE_URI_2 = "uri:oozie:sla:0.2";
065
066    public static final String COORDINATOR_NAMESPACE_URI_1 = "uri:oozie:coordinator:0.1";
067
068    private Schema wfSchema;
069
070    private Schema coordSchema;
071
072    private Schema bundleSchema;
073
074    private Schema slaSchema;
075
076    private Schema loadSchema(String baseSchemas, String extSchema) throws SAXException, IOException {
077        Set<String> schemaNames = new HashSet<String>();
078        String[] schemas = ConfigurationService.getStrings(baseSchemas);
079        if (schemas != null) {
080            for (String schema : schemas) {
081                schema = schema.trim();
082                if (!schema.isEmpty()) {
083                    schemaNames.add(schema);
084                }
085            }
086        }
087        schemas = ConfigurationService.getStrings(extSchema);
088        if (schemas != null) {
089            for (String schema : schemas) {
090                schema = schema.trim();
091                if (!schema.isEmpty()) {
092                    schemaNames.add(schema);
093                }
094            }
095        }
096        List<StreamSource> sources = new ArrayList<StreamSource>();
097        for (String schemaName : schemaNames) {
098            sources.add(new StreamSource(IOUtils.getResourceAsStream(schemaName, -1)));
099        }
100        SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
101        return factory.newSchema(sources.toArray(new StreamSource[sources.size()]));
102    }
103
104    /**
105     * Initialize the service.
106     *
107     * @param services services instance.
108     * @throws ServiceException thrown if the service could not be initialized.
109     */
110    @Override
111    public void init(Services services) throws ServiceException {
112        try {
113            wfSchema = loadSchema(WF_CONF_SCHEMAS, WF_CONF_EXT_SCHEMAS);
114            coordSchema = loadSchema(COORD_CONF_SCHEMAS, COORD_CONF_EXT_SCHEMAS);
115            bundleSchema = loadSchema(BUNDLE_CONF_SCHEMAS, BUNDLE_CONF_EXT_SCHEMAS);
116            slaSchema = loadSchema(SLA_CONF_SCHEMAS, SLA_CONF_EXT_SCHEMAS);
117        }
118        catch (SAXException ex) {
119            throw new ServiceException(ErrorCode.E0130, ex.getMessage(), ex);
120        }
121        catch (IOException ex) {
122            throw new ServiceException(ErrorCode.E0131, ex.getMessage(), ex);
123        }
124    }
125
126    /**
127     * Return the public interface of the service.
128     *
129     * @return {@link SchemaService}.
130     */
131    @Override
132    public Class<? extends Service> getInterface() {
133        return SchemaService.class;
134    }
135
136    /**
137     * Destroy the service.
138     */
139    @Override
140    public void destroy() {
141        wfSchema = null;
142        bundleSchema = null;
143        slaSchema = null;
144        coordSchema = null;
145    }
146
147    /**
148     * Return the schema for XML validation of application definitions.
149     *
150     * @param schemaName: Name of schema definition (i.e.
151     *        WORKFLOW/COORDINATOR/BUNDLE)
152     * @return the schema for XML validation of application definitions.
153     */
154    public Schema getSchema(SchemaName schemaName) {
155        Schema returnSchema = null;
156        if (schemaName == SchemaName.WORKFLOW) {
157            returnSchema = wfSchema;
158        }
159        else if (schemaName == SchemaName.COORDINATOR) {
160            returnSchema = coordSchema;
161        }
162        else if (schemaName == SchemaName.BUNDLE) {
163            returnSchema = bundleSchema;
164        }
165        else if (schemaName == SchemaName.SLA_ORIGINAL) {
166            returnSchema = slaSchema;
167        }
168        else {
169            throw new RuntimeException("No schema found with name " + schemaName);
170        }
171        return returnSchema;
172    }
173
174    public enum SchemaName {
175        WORKFLOW(1), COORDINATOR(2), SLA_ORIGINAL(3), BUNDLE(4);
176        private final int id;
177
178        private SchemaName(int id) {
179            this.id = id;
180        }
181
182        public int getId() {
183            return id;
184        }
185    }
186}