001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import org.apache.oozie.ErrorCode;
022import org.apache.oozie.command.SchemaCheckXCommand;
023import org.apache.oozie.util.Instrumentable;
024import org.apache.oozie.util.Instrumentation;
025import org.apache.oozie.util.XLog;
026
027import java.util.Date;
028
029public class SchemaCheckerService implements Service, Instrumentable {
030    private XLog LOG = XLog.getLog(SchemaCheckerService.class);
031
032    public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchemaCheckerService.";
033    public static final String CONF_INTERVAL = CONF_PREFIX + "check.interval";
034    public static final String CONF_IGNORE_EXTRAS = CONF_PREFIX + "ignore.extras";
035
036    private String status = "N/A (not yet run)";
037    private String lastCheck = "N/A";
038
039    @Override
040    public void init(Services services) throws ServiceException {
041        String url = ConfigurationService.get(JPAService.CONF_URL);
042        String dbType = url.substring("jdbc:".length());
043        dbType = dbType.substring(0, dbType.indexOf(":"));
044
045        int interval = ConfigurationService.getInt(CONF_INTERVAL);
046        if (dbType.equals("derby") || dbType.equals("hsqldb") || dbType.equals("sqlserver") || interval <= 0) {
047            LOG.debug("SchemaCheckerService is disabled: not supported for {0}", dbType);
048            status = "DISABLED (" + dbType + " not supported)";
049        } else {
050            String driver = ConfigurationService.get(JPAService.CONF_DRIVER);
051            String user = ConfigurationService.get(JPAService.CONF_USERNAME);
052            String pass = ConfigurationService.getPassword(JPAService.CONF_PASSWORD, "");
053            boolean ignoreExtras = ConfigurationService.getBoolean(CONF_IGNORE_EXTRAS);
054
055            try {
056                Class.forName(driver).newInstance();
057            } catch (Exception ex) {
058                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex);
059            }
060            Runnable schemaCheckerRunnable = new SchemaCheckerRunnable(dbType, url, user, pass, ignoreExtras);
061            services.get(SchedulerService.class).schedule(schemaCheckerRunnable, 0, interval, SchedulerService.Unit.HOUR);
062        }
063    }
064
065    @Override
066    public void destroy() {
067    }
068
069    @Override
070    public Class<? extends Service> getInterface() {
071        return SchemaCheckerService.class;
072    }
073
074    @Override
075    public void instrument(Instrumentation instr) {
076        instr.addVariable("schema-checker", "status", new Instrumentation.Variable<String>() {
077            @Override
078            public String getValue() {
079                return status;
080            }
081        });
082        instr.addVariable("schema-checker", "last-check", new Instrumentation.Variable<String>() {
083            @Override
084            public String getValue() {
085                return lastCheck;
086            }
087        });
088    }
089
090    public void updateInstrumentation(boolean problem, Date time) {
091        if (problem) {
092            status = "BAD (check log for details)";
093        } else {
094            status = "GOOD";
095        }
096        lastCheck = time.toString();
097    }
098
099    private class SchemaCheckerRunnable implements Runnable {
100        private String dbType;
101        private String url;
102        private String user;
103        private String pass;
104        private boolean ignoreExtras;
105
106        public SchemaCheckerRunnable(String dbType, String url, String user, String pass, boolean ignoreExtras) {
107            this.dbType = dbType;
108            this.url = url;
109            this.user = user;
110            this.pass = pass;
111            this.ignoreExtras = ignoreExtras;
112        }
113
114        @Override
115        public void run() {// Only queue the schema check command if this is the leader
116            if (Services.get().get(JobsConcurrencyService.class).isLeader()) {
117                Services.get().get(CallableQueueService.class).queue(
118                        new SchemaCheckXCommand(dbType, url, user, pass, ignoreExtras));
119            } else {
120                status = "DISABLED (not leader in HA)";
121                lastCheck = "N/A";
122            }
123        }
124    }
125}