001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import org.apache.oozie.ErrorCode; 022import org.apache.oozie.command.SchemaCheckXCommand; 023import org.apache.oozie.util.Instrumentable; 024import org.apache.oozie.util.Instrumentation; 025import org.apache.oozie.util.XLog; 026 027import java.util.Date; 028 029public class SchemaCheckerService implements Service, Instrumentable { 030 private XLog LOG = XLog.getLog(SchemaCheckerService.class); 031 032 public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchemaCheckerService."; 033 public static final String CONF_INTERVAL = CONF_PREFIX + "check.interval"; 034 public static final String CONF_IGNORE_EXTRAS = CONF_PREFIX + "ignore.extras"; 035 036 private String status = "N/A (not yet run)"; 037 private String lastCheck = "N/A"; 038 039 @Override 040 public void init(Services services) throws ServiceException { 041 String url = ConfigurationService.get(JPAService.CONF_URL); 042 String dbType = url.substring("jdbc:".length()); 043 dbType = dbType.substring(0, dbType.indexOf(":")); 044 045 int interval = ConfigurationService.getInt(CONF_INTERVAL); 046 if (dbType.equals("derby") || dbType.equals("hsqldb") || dbType.equals("sqlserver") || interval <= 0) { 047 LOG.debug("SchemaCheckerService is disabled: not supported for {0}", dbType); 048 status = "DISABLED (" + dbType + " not supported)"; 049 } else { 050 String driver = ConfigurationService.get(JPAService.CONF_DRIVER); 051 String user = ConfigurationService.get(JPAService.CONF_USERNAME); 052 String pass = ConfigurationService.getPassword(JPAService.CONF_PASSWORD, ""); 053 boolean ignoreExtras = ConfigurationService.getBoolean(CONF_IGNORE_EXTRAS); 054 055 try { 056 Class.forName(driver).newInstance(); 057 } catch (Exception ex) { 058 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex); 059 } 060 Runnable schemaCheckerRunnable = new SchemaCheckerRunnable(dbType, url, user, pass, ignoreExtras); 061 services.get(SchedulerService.class).schedule(schemaCheckerRunnable, 0, interval, SchedulerService.Unit.HOUR); 062 } 063 } 064 065 @Override 066 public void destroy() { 067 } 068 069 @Override 070 public Class<? extends Service> getInterface() { 071 return SchemaCheckerService.class; 072 } 073 074 @Override 075 public void instrument(Instrumentation instr) { 076 instr.addVariable("schema-checker", "status", new Instrumentation.Variable<String>() { 077 @Override 078 public String getValue() { 079 return status; 080 } 081 }); 082 instr.addVariable("schema-checker", "last-check", new Instrumentation.Variable<String>() { 083 @Override 084 public String getValue() { 085 return lastCheck; 086 } 087 }); 088 } 089 090 public void updateInstrumentation(boolean problem, Date time) { 091 if (problem) { 092 status = "BAD (check log for details)"; 093 } else { 094 status = "GOOD"; 095 } 096 lastCheck = time.toString(); 097 } 098 099 private class SchemaCheckerRunnable implements Runnable { 100 private String dbType; 101 private String url; 102 private String user; 103 private String pass; 104 private boolean ignoreExtras; 105 106 public SchemaCheckerRunnable(String dbType, String url, String user, String pass, boolean ignoreExtras) { 107 this.dbType = dbType; 108 this.url = url; 109 this.user = user; 110 this.pass = pass; 111 this.ignoreExtras = ignoreExtras; 112 } 113 114 @Override 115 public void run() {// Only queue the schema check command if this is the leader 116 if (Services.get().get(JobsConcurrencyService.class).isLeader()) { 117 Services.get().get(CallableQueueService.class).queue( 118 new SchemaCheckXCommand(dbType, url, user, pass, ignoreExtras)); 119 } else { 120 status = "DISABLED (not leader in HA)"; 121 lastCheck = "N/A"; 122 } 123 } 124 } 125}