001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command;
020
021import org.apache.commons.collections.CollectionUtils;
022import org.apache.oozie.BinaryBlob;
023import org.apache.oozie.BundleActionBean;
024import org.apache.oozie.BundleJobBean;
025import org.apache.oozie.CoordinatorActionBean;
026import org.apache.oozie.CoordinatorJobBean;
027import org.apache.oozie.SLAEventBean;
028import org.apache.oozie.StringBlob;
029import org.apache.oozie.WorkflowActionBean;
030import org.apache.oozie.WorkflowJobBean;
031import org.apache.oozie.client.rest.JsonBean;
032import org.apache.oozie.client.rest.JsonSLAEvent;
033import org.apache.oozie.service.SchemaCheckerService;
034import org.apache.oozie.service.Services;
035import org.apache.oozie.sla.SLARegistrationBean;
036import org.apache.oozie.sla.SLASummaryBean;
037import org.apache.oozie.util.Pair;
038import org.apache.oozie.util.XLog;
039import org.apache.openjpa.persistence.jdbc.Index;
040
041import javax.persistence.Column;
042import javax.persistence.DiscriminatorColumn;
043import javax.persistence.DiscriminatorType;
044import javax.persistence.Id;
045import javax.persistence.Lob;
046import javax.persistence.Table;
047import java.lang.reflect.Field;
048import java.sql.Connection;
049import java.sql.DatabaseMetaData;
050import java.sql.DriverManager;
051import java.sql.ResultSet;
052import java.sql.SQLException;
053import java.sql.Timestamp;
054import java.sql.Types;
055import java.util.Arrays;
056import java.util.Collection;
057import java.util.Date;
058import java.util.HashMap;
059import java.util.HashSet;
060import java.util.Map;
061import java.util.Set;
062
063public class SchemaCheckXCommand extends XCommand<Void> {
064    private XLog LOG = XLog.getLog(SchemaCheckXCommand.class);
065
066    private String dbType;
067    private String url;
068    private String user;
069    private String pass;
070    private boolean ignoreExtras;
071
072    public SchemaCheckXCommand(String dbType, String url, String user, String pass, boolean ignoreExtras) {
073        super("schema-check", "schema-check", 0);
074        this.dbType = dbType;
075        this.url = url;
076        this.user = user;
077        this.pass = pass;
078        this.ignoreExtras = ignoreExtras;
079    }
080
081    @Override
082    protected Void execute() throws CommandException {
083        Connection conn = null;
084        LOG.info("About to check database schema");
085        Date startTime = new Date();
086        boolean problem = false;
087        try {
088            conn = DriverManager.getConnection(url, user, pass);
089            String catalog = conn.getCatalog();
090            DatabaseMetaData metaData = conn.getMetaData();
091
092            Map<String, Class<? extends JsonBean>> tableClasses = new HashMap<String, Class<? extends JsonBean>>();
093            tableClasses.put(getTableName(BundleActionBean.class), BundleActionBean.class);
094            tableClasses.put(getTableName(BundleJobBean.class), BundleJobBean.class);
095            tableClasses.put(getTableName(CoordinatorActionBean.class), CoordinatorActionBean.class);
096            tableClasses.put(getTableName(CoordinatorJobBean.class), CoordinatorJobBean.class);
097            tableClasses.put(getTableName(JsonSLAEvent.class), JsonSLAEvent.class);
098            tableClasses.put(getTableName(SLARegistrationBean.class), SLARegistrationBean.class);
099            tableClasses.put(getTableName(SLASummaryBean.class), SLASummaryBean.class);
100            tableClasses.put(getTableName(WorkflowActionBean.class), WorkflowActionBean.class);
101            tableClasses.put(getTableName(WorkflowJobBean.class), WorkflowJobBean.class);
102
103            boolean tableProblem = checkTables(metaData, catalog, tableClasses.keySet());
104            problem = problem | tableProblem;
105            if (!tableProblem) {
106                for (Map.Entry<String, Class<? extends JsonBean>> table : tableClasses.entrySet()) {
107                        TableInfo ti = new TableInfo(table.getValue(), dbType);
108                        boolean columnProblem = checkColumns(metaData, catalog, table.getKey(), ti.columnTypes);
109                        problem = problem | columnProblem;
110                        if (!columnProblem) {
111                            boolean primaryKeyProblem = checkPrimaryKey(metaData, catalog, table.getKey(), ti.primaryKeyColumn);
112                            problem = problem | primaryKeyProblem;
113                            boolean indexProblem = checkIndexes(metaData, catalog, table.getKey(), ti.indexedColumns);
114                            problem = problem | indexProblem;
115                        }
116                    }
117            }
118            if (problem) {
119                LOG.error("Database schema is BAD! Check previous error log messages for details");
120            } else {
121                LOG.info("Database schema is GOOD");
122            }
123        } catch (SQLException sqle) {
124            LOG.error("An Exception occurred while talking to the database: " + sqle.getMessage(), sqle);
125            problem = true;
126        } finally {
127            if (conn != null) {
128                try {
129                    conn.close();
130                } catch (Exception e) {
131                    LOG.error("An Exception occurred while disconnecting from the database: " + e.getMessage(), e);
132                }
133            }
134            Services.get().get(SchemaCheckerService.class).updateInstrumentation(problem, startTime);
135        }
136        return null;
137    }
138
139    private boolean checkTables(DatabaseMetaData metaData, String catalog, final Collection<String> expectedTablesRaw)
140            throws SQLException {
141        boolean problem = false;
142        Set<String> expectedTables = new HashSet<String>(expectedTablesRaw);
143        expectedTables.add(caseTableName("oozie_sys"));
144        expectedTables.add(caseTableName("openjpa_sequence_table"));
145        expectedTables.add(caseTableName("validate_conn"));
146        // Oracle returns > 1000 tables if we don't have the schema "OOZIE"; MySQL and Postgres don't want this
147        String schema = null;
148        if (dbType.equals("oracle")) {
149            schema = "OOZIE";
150        }
151        ResultSet rs = metaData.getTables(catalog, schema, null, new String[]{"TABLE"});
152        Set<String> foundTables = new HashSet<String>();
153        while (rs.next()) {
154            String tabName = rs.getString("TABLE_NAME");
155            if (tabName != null) {
156                foundTables.add(tabName);
157            }
158        }
159        Collection missingTables = CollectionUtils.subtract(expectedTables, foundTables);
160        if (!missingTables.isEmpty()) {
161            LOG.error("Found [{0}] missing tables: {1}", missingTables.size(), Arrays.toString(missingTables.toArray()));
162            problem = true;
163        } else if (LOG.isDebugEnabled()) {
164            LOG.debug("No missing tables found: {0}", Arrays.toString(expectedTables.toArray()));
165        }
166        if (!ignoreExtras) {
167            Collection extraTables = CollectionUtils.subtract(foundTables, expectedTables);
168            if (!extraTables.isEmpty()) {
169                LOG.error("Found [{0}] extra tables: {1}", extraTables.size(), Arrays.toString(extraTables.toArray()));
170                problem = true;
171            } else {
172                LOG.debug("No extra tables found");
173            }
174        }
175        return problem;
176    }
177
178    private boolean checkColumns(DatabaseMetaData metaData, String catalog, String table,
179                                 Map<String, Integer> expectedColumnTypes) throws SQLException {
180        boolean problem = false;
181        Map<String, Pair<Integer, String>> foundColumns = new HashMap<String, Pair<Integer, String>>();
182        ResultSet rs = metaData.getColumns(catalog, null, table, null);
183        while (rs.next()) {
184            String colName = rs.getString("COLUMN_NAME");
185            Integer dataType = rs.getInt("DATA_TYPE");
186            String colDef = rs.getString("COLUMN_DEF");
187            if (colName != null) {
188                foundColumns.put(colName, new Pair<Integer, String>(dataType, colDef));
189            }
190        }
191        Collection missingColumns = CollectionUtils.subtract(expectedColumnTypes.keySet(), foundColumns.keySet());
192        if (!missingColumns.isEmpty()) {
193            LOG.error("Found [{0}] missing columns in table [{1}]: {2}",
194                    missingColumns.size(), table, Arrays.toString(missingColumns.toArray()));
195            problem = true;
196        } else {
197            for (Map.Entry<String, Integer> ent : expectedColumnTypes.entrySet()) {
198                if (!foundColumns.get(ent.getKey()).getFirst().equals(ent.getValue())) {
199                    LOG.error("Expected column [{0}] in table [{1}] to have type [{2}], but found type [{3}]",
200                            ent.getKey(), table, getSQLTypeFromInt(ent.getValue()),
201                            getSQLTypeFromInt(foundColumns.get(ent.getKey()).getFirst()));
202                    problem = true;
203                } else if (foundColumns.get(ent.getKey()).getSecond() != null) {
204                    LOG.error("Expected column [{0}] in table [{1}] to have default value [NULL], but found default vale [{2}]",
205                            ent.getKey(), table, foundColumns.get(ent.getKey()).getSecond());
206                    problem = true;
207                } else {
208                    LOG.debug("Found column [{0}] in table [{1}] with type [{2}] and default value [NULL]",
209                            ent.getKey(), table, getSQLTypeFromInt(ent.getValue()));
210                }
211            }
212        }
213        if (!ignoreExtras) {
214            Collection extraColumns = CollectionUtils.subtract(foundColumns.keySet(), expectedColumnTypes.keySet());
215            if (!extraColumns.isEmpty()) {
216                LOG.error("Found [{0}] extra columns in table [{1}]: {2}",
217                        extraColumns.size(), table, Arrays.toString(extraColumns.toArray()));
218                problem = true;
219            } else {
220                LOG.debug("No extra columns found in table [{0}]", table);
221            }
222        }
223        return problem;
224    }
225
226    private boolean checkPrimaryKey(DatabaseMetaData metaData, String catalog, String table, String expectedPrimaryKeyColumn)
227            throws SQLException {
228        boolean problem = false;
229        ResultSet rs = metaData.getPrimaryKeys(catalog, null, table);
230        if (!rs.next()) {
231            LOG.error("Expected column [{0}] to be the primary key in table [{1}], but none were found",
232                    expectedPrimaryKeyColumn, table);
233            problem = true;
234        } else {
235            String foundPrimaryKeyColumn = rs.getString("COLUMN_NAME");
236            if (!foundPrimaryKeyColumn.equals(expectedPrimaryKeyColumn)) {
237                LOG.error("Expected column [{0}] to be the primary key in table [{1}], but found column [{2}] instead",
238                        expectedPrimaryKeyColumn, table, foundPrimaryKeyColumn);
239                problem = true;
240            } else {
241                LOG.debug("Found column [{0}] to be the primary key in table [{1}]", expectedPrimaryKeyColumn, table);
242            }
243        }
244        return problem;
245    }
246
247    private boolean checkIndexes(DatabaseMetaData metaData, String catalog, String table, Set<String> expectedIndexedColumns)
248            throws SQLException {
249        boolean problem = false;
250        Set<String> foundIndexedColumns = new HashSet<String>();
251        ResultSet rs = metaData.getIndexInfo(catalog, null, table, false, true);
252        while (rs.next()) {
253            String colName = rs.getString("COLUMN_NAME");
254            if (colName != null) {
255                foundIndexedColumns.add(colName);
256            }
257        }
258        Collection missingIndexColumns = CollectionUtils.subtract(expectedIndexedColumns, foundIndexedColumns);
259        if (!missingIndexColumns.isEmpty()) {
260            LOG.error("Found [{0}] missing indexes for columns in table [{1}]: {2}",
261                    missingIndexColumns.size(), table, Arrays.toString(missingIndexColumns.toArray()));
262            problem = true;
263        } else {
264            if (LOG.isDebugEnabled()) {
265                LOG.debug("No missing indexes found in table [{0}]: {1}",
266                        table, Arrays.toString(expectedIndexedColumns.toArray()));
267            }
268        }
269        if (!ignoreExtras) {
270            Collection extraIndexColumns = CollectionUtils.subtract(foundIndexedColumns, expectedIndexedColumns);
271            if (!extraIndexColumns.isEmpty()) {
272                LOG.error("Found [{0}] extra indexes for columns in table [{1}]: {2}",
273                        extraIndexColumns.size(), table, Arrays.toString(extraIndexColumns.toArray()));
274                problem = true;
275            } else {
276                LOG.debug("No extra indexes found in table [{0}]", table);
277            }
278        }
279        return problem;
280    }
281
282    private String getTableName(Class<? extends JsonBean> clazz) {
283        Table tabAnn = clazz.getAnnotation(Table.class);
284        if (tabAnn != null) {
285            return caseTableName(tabAnn.name());
286        }
287        return null;
288    }
289
290    private String caseTableName(String name) {
291        // MySQL and Oracle wants table names in all caps
292        if (dbType.equals("mysql") || dbType.equals("oracle")) {
293            return name.toUpperCase();
294        }
295        // Postgres wants table names in all lowers
296        if (dbType.equals("postgresql")) {
297            return name.toLowerCase();
298        }
299        return name;
300    }
301
302    private String getSQLTypeFromInt(int t) {
303        switch (t) {
304            case Types.BIT:
305                return "BIT";
306            case Types.TINYINT:
307                return "TINYINT";
308            case Types.SMALLINT:
309                return "SMALLINT";
310            case Types.INTEGER:
311                return "INTEGER";
312            case Types.BIGINT:
313                return "BIGINT";
314            case Types.FLOAT:
315                return "FLOAT";
316            case Types.REAL:
317                return "REAL";
318            case Types.DOUBLE:
319                return "DOUBLE";
320            case Types.NUMERIC:
321                return "NUMERIC";
322            case Types.DECIMAL:
323                return "DECIMAL";
324            case Types.CHAR:
325                return "CHAR";
326            case Types.VARCHAR:
327                return "VARCHAR";
328            case Types.LONGVARCHAR:
329                return "LONGVARCHAR";
330            case Types.DATE:
331                return "DATE";
332            case Types.TIME:
333                return "TIME";
334            case Types.TIMESTAMP:
335                return "TIMESTAMP";
336            case Types.BINARY:
337                return "BINARY";
338            case Types.VARBINARY:
339                return "VARBINARY";
340            case Types.LONGVARBINARY:
341                return "LONGVARBINARY";
342            case Types.NULL:
343                return "NULL";
344            case Types.OTHER:
345                return "OTHER";
346            case Types.JAVA_OBJECT:
347                return "JAVA_OBJECT";
348            case Types.DISTINCT:
349                return "DISTINCT";
350            case Types.STRUCT:
351                return "STRUCT";
352            case Types.ARRAY:
353                return "ARRAY";
354            case Types.BLOB:
355                return "BLOB";
356            case Types.CLOB:
357                return "CLOB";
358            case Types.REF:
359                return "REF";
360            case Types.DATALINK:
361                return "DATALINK";
362            case Types.BOOLEAN:
363                return "BOOLEAN";
364            case Types.ROWID:
365                return "ROWID";
366            case Types.NCHAR:
367                return "NCHAR";
368            case Types.NVARCHAR:
369                return "NVARCHAR";
370            case Types.LONGNVARCHAR:
371                return "LONGNVARCHAR";
372            case Types.NCLOB:
373                return "NCLOB";
374            case Types.SQLXML:
375                return "SQLXML";
376            default:
377                return "unknown";
378        }
379    }
380
381    private static class TableInfo {
382        String primaryKeyColumn;
383        Map<String, Integer> columnTypes;
384        Set<String> indexedColumns;
385
386        public TableInfo(Class<? extends JsonBean> clazz, String dbType) {
387            columnTypes = new HashMap<String, Integer>();
388            indexedColumns = new HashSet<String>();
389            populate(clazz, dbType);
390            // The "SLA_EVENTS" table is made up of two classes (JsonSLAEvent and SLAEventBean), and the reflection doesn't pick up
391            // from both automatically, so we have to manually do this
392            if (clazz.equals(JsonSLAEvent.class)) {
393                populate(SLAEventBean.class, dbType);
394            }
395        }
396
397        private void populate(Class<? extends JsonBean> clazz, String dbType) {
398            Field[] fields = clazz.getDeclaredFields();
399            for (Field field : fields) {
400                Column colAnn = field.getAnnotation(Column.class);
401                if (colAnn != null) {
402                    String name = caseColumnName(colAnn.name(), dbType);
403                    boolean isLob = (field.getAnnotation(Lob.class) != null);
404                    Integer type = getSQLType(field.getType(), isLob, dbType);
405                    columnTypes.put(name, type);
406                    boolean isIndex = (field.getAnnotation(Index.class) != null);
407                    if (isIndex) {
408                        indexedColumns.add(name);
409                    }
410                    boolean isPrimaryKey = (field.getAnnotation(Id.class) != null);
411                    if (isPrimaryKey) {
412                        indexedColumns.add(name);
413                        primaryKeyColumn = name;
414                    }
415                } else {
416                    // Some Id fields don't have an @Column annotation
417                    Id idAnn = field.getAnnotation(Id.class);
418                    if (idAnn != null) {
419                        String name = caseColumnName(field.getName(), dbType);
420                        boolean isLob = (field.getAnnotation(Lob.class) != null);
421                        Integer type = getSQLType(field.getType(), isLob, dbType);
422                        columnTypes.put(name, type);
423                        indexedColumns.add(name);
424                        primaryKeyColumn = name;
425                    }
426                }
427            }
428            DiscriminatorColumn discAnn = clazz.getAnnotation(DiscriminatorColumn.class);
429            if (discAnn != null) {
430                String name = caseColumnName(discAnn.name(), dbType);
431                Integer type = getSQLType(discAnn.discriminatorType());
432                columnTypes.put(name, type);
433                indexedColumns.add(name);
434            }
435            // For some reason, MySQL doesn't end up having this index...
436            if (dbType.equals("mysql") && clazz.equals(WorkflowActionBean.class)) {
437                indexedColumns.remove("wf_id");
438            }
439        }
440
441        private static Integer getSQLType(Class<?> clazz, boolean isLob, String dbType) {
442            if (clazz.equals(String.class)) {
443                if (dbType.equals("mysql") && isLob) {
444                    return Types.LONGVARCHAR;
445                }
446                if (dbType.equals("oracle") && isLob) {
447                    return Types.CLOB;
448                }
449                return Types.VARCHAR;
450            }
451            if (clazz.equals(StringBlob.class) || clazz.equals(BinaryBlob.class)) {
452                if (dbType.equals("mysql")) {
453                    return Types.LONGVARBINARY;
454                }
455                if (dbType.equals("oracle")) {
456                    return Types.BLOB;
457                }
458                return Types.BINARY;
459            }
460            if (clazz.equals(Timestamp.class)) {
461                return Types.TIMESTAMP;
462            }
463            if (clazz.equals(int.class)) {
464                if (dbType.equals("oracle")) {
465                    return Types.DECIMAL;
466                }
467                return Types.INTEGER;
468            }
469            if (clazz.equals(long.class)) {
470                if (dbType.equals("oracle")) {
471                    return Types.DECIMAL;
472                }
473                return Types.BIGINT;
474            }
475            if (clazz.equals(byte.class)) {
476                if (dbType.equals("mysql")) {
477                    return Types.TINYINT;
478                }
479                if (dbType.equals("oracle")) {
480                    return Types.DECIMAL;
481                }
482                return Types.SMALLINT;
483            }
484            return null;
485        }
486
487        private static Integer getSQLType(DiscriminatorType discType) {
488            switch (discType) {
489                case STRING:
490                    return Types.VARCHAR;
491                case CHAR:
492                    return Types.CHAR;
493                case INTEGER:
494                    return Types.INTEGER;
495            }
496            return null;
497        }
498
499        private static String caseColumnName(String name, String dbType) {
500            // Oracle wants column names in all caps
501            if (dbType.equals("oracle")) {
502                return name.toUpperCase();
503            }
504            // Postgres and MySQL want column names in all lowers
505            if (dbType.equals("postgresql") || dbType.equals("mysql")) {
506                return name.toLowerCase();
507            }
508            return name;
509        }
510    }
511
512    @Override
513    protected void loadState() throws CommandException {
514    }
515
516    @Override
517    protected void verifyPrecondition() throws CommandException, PreconditionException {
518    }
519
520    @Override
521    protected boolean isLockRequired() {
522        return false;
523    }
524
525    @Override
526    public String getEntityKey() {
527        return null;
528    }
529}