Today I will teach you how to quickly implement a database with the help of a framework. This framework is Calcite
. Below, I will teach you how to implement it quickly through two examples. One is to directly query the file content through SQL statements, and the second is to simulate Mysql. Query function, and finally tell you how to implement SQL query for Kafka data.
Contents
Calcite
Calcite
It is a pluggable basic framework (he is a framework) for optimizing query processing of heterogeneous data sources. It can convert any data (Any data, Anywhere) DML into a SQL-based DML engine, and we can selectively Use some of its features.
What can Calcite do?
- Use SQL to access certain data in memory
- Use SQL to access data from a file
- Data access, aggregation, sorting, etc. across data sources (such as joining data in Mysql and Redis data sources)
When we need to build a database by ourselves, the data can be in any format, such as text, word, xml, mysql, es, csv, third-party interface data, etc. We only have data, and we want these data to support dynamic SQL forms Add, delete, modify and check.
In addition, in projects such as Hive, Drill, Flink, Phoenix and Storm, data processing systems all use Calcite for SQL parsing and query optimization. Of course, some are used to build their own JDBC drivers.
Glossary
Token
It is to intercept the standard SQL (can be understood as Mysql) keywords and the strings between the keywords. Each one token
will be encapsulated into one SqlNode
, and SqlNode
many subclasses will be derived. For example, it Select
will be encapsulated as SqlSelect
, and currently SqlNode
it can also be reversely parsed as SQL text.
RelDataTypeField
The name and type information of a field
RelDataType
Multiple RelDataTypeFields form RelDataType, which can be understood as data rows
Table
A complete table of information
Schema
The combination of all metadata can be understood as a set of Table or library concepts.
start using
1. Import package
<dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> <!--The latest version is 2022-09- updated on 10th --> <version> 1.32 .0 </version> </dependency>
2. Create model.json file and table structure csv
model.json mainly describes or tells Calcite
how to create it Schema
, that is, it tells the framework how to create a library.
{ "version" : "1.0" , //Ignore "defaultSchema" : "CSV" , //Set the default schema "schemas" : [ //Can define multiple schemas { "name" : "CSV" , // Equivalent The value of namespace and the above defaultSchema corresponds to "type" : "custom" , //hard-coded "factory" : "csv.CsvSchemaFactory" , //the class name of factory must be the full path of the factory package you implemented yourself ." operand" : { //You can pass custom parameters here, which will eventually be passed to the factory's operand parameter in the form of a map. "directory" : "csv" //Directory represents that calcite will read all csv in the csv directory under resources. Files, the Schema created by the factory will build all these files into a Table, which can be understood as the root directory for reading data files. Of course, the name of the key does not have to be directory, you can specify it at will } } ] }
Next, you need to define a csv
file to define the table structure.
NAME :string,MONEY:string aixiaoxian , 100 million xiaobai , 100 million adong , 100 million maomao , 100 million xixi , 100 million zizi , 100 million wuwu , 100 million kuku , 100 million
The structure of the entire project is roughly like this:
3. Factory class that implements Schema
Go to the package path specified in the above file to write CsvSchemaFactory
a class, implement SchemaFactory
the interface, and implement the only method in it create
to create Schema
a (library).
public class CsvSchemaFactory implements SchemaFactory { /** * parentSchema parent node, usually root * name is the name defined in model.json * operand is the data set in model.json. Custom parameters can be passed here. * * @param parentSchema Parent schema * @param name Name of this schema * @param operand The "operand" JSON property * @return */ @Override public Schema create (SchemaPlus parentSchema, String name, Map<String, Object> operand) { final String directory = (String) operand.get( "directory" ); File directoryFile = new File (directory); return new CsvSchema (directoryFile, "scannable" ); } }
4. Customize Schma class
Now that you have it SchemaFactory
, you need to customize Schema
the class.
The custom interface Schema
needs to be implemented Schema
, but there are too many methods to be implemented directly. Let’s implement the official AbstractSchema
class, so that only one method needs to be implemented (if there are other customized requirements, the native interface can be implemented).
The core logic is createTableMap
the method, which is used to create Table
the table.
It will scan Resource
all the specified files below csv
, map each file into Table
an object, and finally map
return it in the form. Schema
Several other methods of the interface will use this object.
//Just implement this method @Override protected Map<String, Table> getTableMap () { if (tableMap == null ) { tableMap = createTableMap(); } return tableMap; } private Map<String, Table> createTableMap () { // Look for files in the directory ending in ".csv" final Source baseSource = Sources.of(directoryFile); // Files with non-specified file suffixes will be automatically filtered out. csv written here File[] files = directoryFile.listFiles((dir, name) -> { final String nameSansGz = trim(name, ".gz" ); return nameSansGz.endsWith( ".csv" ); }); if (files == null ) { System.out.println( "directory " + directoryFile + " not found" ); files = new File [ 0 ]; } // Build a map from table name to table; each file becomes a table. final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); for (File file : files) { Source source = Sources.of(file ); final Source sourceSansCsv = source.trimOrNull( ".csv" ); if (sourceSansCsv != null ) { final Table table = createTable(source); builder.put(sourceSansCsv.relative(baseSource).path(), table); } } return builder.build(); }
5. Customize Table
Schema
Yes, and the data file csv
is also mapped to Table
, one csv
file corresponds to one Table
.
Next we go to customization Table
. The core of customization Table
is that we need to define the type and name of the field, and how to read csv
the file.
- First get the data type and name, that is, a single table structure, from
csv
the file header (the current file header needs to be defined by ourselves, including the rules, which we can also customize).
/** * Base class for table that reads CSV files. */ public abstract class CsvTable extends AbstractTable { protected final Source source; protected final @Nullable RelProtoDataType protoRowType; private @Nullable RelDataType rowType; private @Nullable List<RelDataType> fieldTypes; /** * Creates a CsvTable. */ CsvTable(Source source, @Nullable RelProtoDataType protoRowType) { this .source = source; this .protoRowType = protoRowType; } /** * To create a CsvTable and inherit AbstractTable, you need to implement the getRowType method inside. This method is to obtain the current table structure. There are many types of Table, such as view types. The AbstractTable class helps us implement some methods of the Table interface by default, such as the getJdbcTableType method, which defaults to the Table type. If there are other customized requirements, the Table interface can be implemented directly. Very similar to AbstractSchema */ @Override public RelDataType getRowType (RelDataTypeFactory typeFactory) { if (protoRowType != null ) { return protoRowType.apply(typeFactory); } if (rowType == null ) { rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source, null ); } return rowType; } /** * Returns the field types of this CSV table. */ public List<RelDataType> getFieldTypes (RelDataTypeFactory typeFactory) { if (fieldTypes == null ) { fieldTypes = new ArrayList <>(); CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source, fieldTypes); } return fieldTypes; } public static RelDataType deduceRowType (JavaTypeFactory typeFactory, Source source, @Nullable List<RelDataType> fieldTypes) { final List<RelDataType> types = new ArrayList <>(); final List<String> names = new ArrayList <>(); try ( CSVReader reader = openCsv(source)) { String[] strings = reader.readNext(); if (strings == null ) { strings = new String []{ "EmptyFileHasNoColumns:boolean" }; } for (String string : strings) { final String name; final RelDataType fieldType; //Simply read the string. The name is in front of the colon, and the type is after the colon. final int colon = string.indexOf( ':' ); if (colon >= 0 ) { name = string.substring( 0 , colon); String typeString = string.substring(colon + 1 ); Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString); if (decimalMatcher.matches()) { int precision = Integer.parseInt(decimalMatcher .group( 1 )); int scale = Integer.parseInt(decimalMatcher.group( 2 )); fieldType = parseDecimalSqlType(typeFactory, precision, scale); } else { switch (typeString) { case "string" : fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR); break ; case "boolean" : fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN); break ; case "byte" : fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT); break ; case "char" : fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR); break ; case "short" : fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT); break ; case "int" : fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER); break ; case "long" : fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT); break ; case "float" : fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL); break ; case "double" : fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE); break ; case "date" : fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE); break ; case "timestamp" : fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP); break ; case "time" : fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME); break ; default : LOGGER.warn( "Found unknown type: {} in file: {} for column: {}. Will assume the type of " + "column is string." , typeString, source.path(), name); fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR); break ; } } } else { // If not defined, the default is String type, and the field name is also string name = string; fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR); } names.add(name); types.add(fieldType); if (fieldTypes != null ) { fieldTypes.add(fieldType); } } } catch (IOException e) { // ignore } if (names.isEmpty()) { names.add( "line" ); types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR)); } return typeFactory.createStructType(Pair.zip(names, types)); } }
- Obtain the data in the file.
Table
After obtaining the field names and types of the table structure above, the last step is to obtain the data in the file. We need to customize a class, implementScannableTable
the interface, and implement the only method in itscan
. The method is essentially to read the file, and thenfileType
match the data of each line of the file with the obtained above.
@Override public Enumerable<Object[]> scan(DataContext root) { JavaTypeFactory typeFactory = root.getTypeFactory(); final List<RelDataType> fieldTypes = getFieldTypes(typeFactory); final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size ()); final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root); return new AbstractEnumerable < @Nullable Object[]>() { @Override public Enumerator< @Nullable Object[]> enumerator() { //return Our custom class for reading data return new CsvEnumerator <>(source, cancelFlag, false , null , CsvEnumerator.arrayConverter(fieldTypes, fields, false )); } }; } public CsvEnumerator (Source source, AtomicBoolean cancelFlag, boolean stream, @Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) { this .cancelFlag = cancelFlag; this .rowConverter = rowConverter; this .filterValues = filterValues == null ? null : ImmutableNullableList.copyOf(filterValues); try { this .reader = openCsv(source); //Skip the first line because the first line is where the type and name are defined this .reader.readNext(); // skip header row } catch (IOException e) { throw new RuntimeException (e); } } //CsvEnumerator must implement calcit's own iterator, which contains current and moveNext methods. Current returns the data record where the current cursor is located. MoveNext points the cursor to the next record. The official website defines a type converter, which converts csv The data in the file is converted into the type specified by the file header. This needs to be implemented by ourselves @Override public E current () { return castNonNull(current); } @Override public boolean moveNext () { try { outer: for (; ; ) { if (cancelFlag.get()) { return false ; } final String[] strings = reader.readNext(); if (strings == null ) { current = null ; reader.close(); return false ; } if (filterValues != null ) { for ( int i = 0 ; i < strings.length; i++) { String filterValue = filterValues.get(i); if (filterValue != null ) { if (!filterValue.equals(strings [i])) { continue outer; } } } } current = rowConverter.convertRow(strings); return true ; } } catch (IOException e) { throw new RuntimeException (e); } } protected @Nullable Object convert ( @Nullable RelDataType fieldType, @Nullable String string) { if (fieldType == null || string == null ) { return string; } switch (fieldType.getSqlTypeName()) { case BOOLEAN: if (string.length() == 0 ) { return null ; } return Boolean.parseBoolean(string); case TINYINT: if (string.length() == 0 ) { return null ; } return Byte.parseByte(string); case SMALLINT: if (string.length() == 0 ) { return null ; } return Short.parseShort(string); case INTEGER: if (string.length() == 0 ) { return null ; } return Integer.parseInt(string); case BIGINT: if (string.length() == 0 ) { return null ; } return Long.parseLong(string); case FLOAT: if (string.length() == 0 ) { return null ; } return Float.parseFloat(string); case DOUBLE: if (string.length() == 0 ) { return null ; } return Double.parseDouble(string); case DECIMAL: if (string.length() == 0 ) { return null ; } return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string); case DATE: if (string.length() == 0 ) { return null ; } try { Date date = TIME_FORMAT_DATE.parse(string); return ( int ) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY); } catch (ParseException e) { return null ; } case TIME: if (string.length() == 0 ) { return null ; } try { Date date = TIME_FORMAT_TIME.parse(string); return ( int ) date.getTime(); } catch (ParseException e) { return null ; } case TIMESTAMP: if (string.length() == 0 ) { return null ; } try { Date date = TIME_FORMAT_TIMESTAMP.parse(string); return date.getTime(); } catch (ParseException e) { return null ; } case VARCHAR: default : return string; } }
6. Finally
At this point, we have everything we need to prepare: library, table name, field name, and field type. Next, we write our SQL statement to query our data file.
Create several test data files. For example, in the project structure above, I created 2 csv files USERINFO.csv
, ASSET.csv
and then created a test class.
After running in this way, you can directly query the data through SQL statements.
public class Test { public static void main (String[] args) throws SQLException { Connection connection = null ; Statement statement = null ; try { Properties info = new Properties (); info.put( "model" , Sources.of(Test.class.getResource( "/model.json" )).file().getAbsolutePath()); connection = DriverManager.getConnection( "jdbc:calcite:" , info); statement = connection.createStatement(); print(statement.executeQuery( "select * from asset " )); print(statement.executeQuery( " select * from userinfo " )); print(statement.executeQuery( " select age from userinfo where name ='aixiaoxian' " )); print(statement.executeQuery( " select * from userinfo where age >60 " )); print(statement.executeQuery( " select * from userinfo where name like 'a%' " )); } finally { connection.close(); } } private static void print (ResultSet resultSet) throws SQLException { final ResultSetMetaData metaData = resultSet.getMetaData(); final int columnCount = metaData.getColumnCount(); while (resultSet.next()) { for ( int i = 1 ; ; i++) { System.out.print(resultSet.getString(i)); if (i < columnCount) { System.out.print( ", " ); } else { System.out.println(); break ; } } } } }
search result:
There are two pitfalls encountered during testing. You can avoid them if you experiment by yourself.
Calcite
By default, all table names and class names in your SQL statements will be converted to uppercase, because the name of the default csv (same for other files) file is the table name, unless you customize the rules, so your file name must be written in uppercase.Calcite
There are some default keywords that cannot be used as table names, otherwise the query will fail. For example, Iuser.csv
can’t find the ones I set at the beginning.USERINFO
I can change them. This isMysql
similar to the built-in keywords. It can also be configured through personalized configuration. Go and change.
DemoMysql
- First, prepare
Calcite
the necessary things: library, table name, field name, field type.
If the data source is used Mysql
, we do not need to define these in the JAVA service. They are created directly in the Mysql client. Here, two tables are directly created for testing, just like our csv
files.
CREATE TABLE `USERINFO1` ( `NAME` varchar ( 255 ) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL , `AGE` int DEFAULT NULL ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb3; CREATE TABLE `ASSET` ( `NAME` varchar ( 255 ) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL , `MONEY` varchar ( 255 ) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb3;
csv
In the above caseSchemaFactory
andSchema
these do not need to be created, becauseCalcite
the Mysql Adapter adapter is provided by default.- In fact, there is no need to do the above two steps. What we really need to do is to tell
Calcite
you the JDBC connection information, which is alsomodel.json
defined in the file.
{ "version" : "1.0" , "defaultSchema" : "Demo" , "schemas" : [ { "name" : "Demo" , "type" : "custom" , // This is calcite's default SchemaFactory, inside The process is the same as what we defined above. Let’s take a brief look at the source code. "factory" : "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory" , "operand" : { // I am using mysql8 or above, so pay attention to the package name here "jdbcDriver" : "com.mysql. cj.jdbc.Driver" , "jdbcUrl" : "jdbc:mysql://localhost:3306/irving" , "jdbcUser" : "root" , "jdbcPassword" : "123456" } } ] }
- Introduce the Mysql driver package into the project
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version> 8.0 .30 </version> </dependency>
- Write the test class, which is equivalent to completing all the functions.
public class TestMysql { public static void main (String[] args) throws SQLException { Connection connection = null ; Statement statement = null ; try { Properties info = new Properties (); info.put( "model" , Sources.of(TestMysql.class.getResource( "/mysqlmodel.json" )).file().getAbsolutePath()); connection = DriverManager.getConnection( "jdbc:calcite:" , info); statement = connection.createStatement(); statement.executeUpdate( " insert into userinfo1 values ('xxx',12) " ); print(statement.executeQuery( "select * from asset " )); print(statement.executeQuery( " select * from userinfo1 " )); print(statement.executeQuery( " select age from userinfo1 where name ='aixiaoxian' " )); print(statement.executeQuery( " select * from userinfo1 where age >60 " )); print(statement.executeQuery( " select * from userinfo1 where name like 'a%' " )); } finally { connection.close(); } } private static void print (ResultSet resultSet) throws SQLException { final ResultSetMetaData metaData = resultSet.getMetaData(); final int columnCount = metaData.getColumnCount(); while (resultSet.next()) { for ( int i = 1 ; ; i++) { System.out.print(resultSet.getString(i)); if (i < columnCount) { System.out.print( ", " ); } else { System.out.println(); break ; } } } } }
search result:
Mysql implementation principle
Above we model.json
specified org.apache.calcite.adapter.jdbc.JdbcSchema$Factory
the class in the file, you can look at the code of this class.
This class writes Factory
and Schema
together, which actually means calling the schemafactory
class create
method to create a schema
, which is the same as the process we customized above.
The JdbcSchema
class is also Schema
a subclass of , so it will also implement getTable
methods (we also implemented this above. We were getting the table structure and field types and names of the table, and reading the file from the csv file header). JdbcSchema
The implementation is through the connection The Mysql server queries the metadata information and then encapsulates the information into Calcite
the required object format.
Here we also need to pay attention csv
to two points of attention, capitalization and keyword issues.
public static JdbcSchema create ( SchemaPlus parentSchema, String name, Map<String, Object> operand) { DataSource dataSource; try { final String dataSourceName = (String) operand.get( "dataSource" ); if (dataSourceName != null ) { dataSource = AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName); } else { //Come here, here is the jdbc connection information we specified in model.json, and finally create a datasource final String jdbcUrl = (String) requireNonNull(operand.get( "jdbcUrl" ), " jdbcUrl" ); final String jdbcDriver = (String) operand.get( "jdbcDriver" ); final String jdbcUser = (String) operand.get( "jdbcUser" ); final String jdbcPassword = (String) operand.get( "jdbcPassword" ); dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword); } } catch (Exception e) { throw new RuntimeException ( "Error while reading dataSource" , e); } String jdbcCatalog = (String) operand.get( "jdbcCatalog" ); String jdbcSchema = (String) operand.get( "jdbcSchema" ); String sqlDialectFactory = (String) operand.get( "sqlDialectFactory" ); if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) { return JdbcSchema.create( parentSchema, name, dataSource, jdbcCatalog, jdbcSchema); } else { SqlDialectFactory factory = AvaticaUtils.instantiatePlugin( SqlDialectFactory.class, sqlDialectFactory); return JdbcSchema.create( parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema); } } @Override public @Nullable Table getTable (String name) { return getTableMap( false ).get(name); } private synchronized ImmutableMap<String, JdbcTable> getTableMap ( boolean force) { if (force || tableMap == null ) { tableMap = computeTables(); } return tableMap; } private ImmutableMap<String, JdbcTable> computeTables () { Connection connection = null ; ResultSet resultSet = null ; try { connection = dataSource.getConnection(); final Pair< @Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection); final String catalog = catalogSchema.left; final String schema = catalogSchema.right; final Iterable<MetaImpl.MetaTable> tableDefs; Foo threadMetadata = THREAD_METADATA.get( ); if (threadMetadata != null ) { tableDefs = threadMetadata.apply(catalog, schema); } else { final List<MetaImpl.MetaTable> tableDefList = new ArrayList <>(); // Get metadata final DatabaseMetaData metaData = connection.getMetaData(); resultSet = metaData.getTables(catalog, schema, null , null ); while (resultSet.next()) { //Get the library name, display and other information final String catalogName = resultSet.getString( 1 ); final String schemaName = resultSet. getString( 2 ); final String tableName = resultSet.getString( 3 ); final String tableTypeName = resultSet.getString( 4 ); tableDefList.add( new MetaImpl .MetaTable(catalogName, schemaName, tableName, tableTypeName)); } tableDefs = tableDefList; } final ImmutableMap.Builder<String, JdbcTable> builder = ImmutableMap.builder(); for (MetaImpl.MetaTable tableDef : tableDefs) { final String tableTypeName2 = tableDef.tableType == null ? null : tableDef.tableType.toUpperCase(Locale.ROOT).replace( ' ' , '_' ); final TableType tableType = Util.enumVal(TableType.OTHER, tableTypeName2); if (tableType == TableType.OTHER && tableTypeName2 != null ) { System.out.println( "Unknown table type: " + tableTypeName2); } //Finally encapsulated into a JdbcTable object final JdbcTable table = new JdbcTable ( this , tableDef.tableCat, tableDef.tableSchem, tableDef.tableName, tableType); builder.put(tableDef.tableName, table); } return builder.build(); } catch (SQLException e) { throw new RuntimeException ( "Exception while reading tables" , e); } finally { close(connection, null , resultSet); } }
SQL execution process
OK, basically two simple cases have been demonstrated here. Finally, I will add the entire Calcite
architecture and the entire SQL execution process.
The entire process is as follows: SQL parsing (Parser) => SQL verification (Validator) => SQL query optimization (optimizer) => SQL generation => SQL execution
SQL Parser
All SQL statements need to be parsed by a SQL parser before execution. The work of the parser is to parse the Token in SQL into an abstract syntax tree. The node of each tree is a SqlNode. This process is actually Sql Text => SqlNode process.
Our previous Demo did not have a custom Parser because Calcite uses its own default Parser (SqlParserImpl).
SqlNode
SqlNode
is the core of the entire analysis. For example, you can find in the picture that for each content such as select
, from
, and where
keywords are actually one SqlNode
.
parserConfig
The main method is to set the parameters of SqlParserFactory. For example, as we mentioned above, the pitfalls of capitalization and capitalization that I encountered during local testing can be set here.
If you call it directly setCaseSensitive=false
, the table name and column name in the SQL statement will not be converted to uppercase. The following is the default, and other parameters can be configured as needed.
SQL Validator
The SQL statement first passes through Parser and then through the syntax validator. Note that Parser does not verify the correctness of the syntax.
The real verification is in validator
, which will verify whether the queried table name exists, whether the queried field exists, and whether the types match. This process is more complicated, and the default validator
is SqlValidatorImpl
.
Query optimization
For example, relational algebra, such as projections and Cartesian products, Calcite
provide many internal optimizers, and you can also implement your own optimizers.
adapter
Calcite
It does not include a storage layer, so it provides an adapter mechanism to access external data storage or storage engines.
Finally, advanced
The official website states that in the future , Kafka
the adapter will be supported in the public domain Api
, and it will be as convenient to use as the above integration Mysql
. However, it is not supported yet. I will provide you with a way to implement it yourself, so that you can directly query Kafka through SQL. Topic data and other information in the.
Here we have internally integrated and implemented KSQL
capabilities, and the query results are OK.
Still like the above steps, we need to prepare the library, table name, field name, field type, and data source (extra places).
- Custom
Sql
parsing. We didn’t have custom parsing before. We need custom parsing here because I need to dynamically parsesql
thewhere
conditionspartation
.
- Configuring the parser is the configuration case mentioned in the previous case.
- Create a parser, using the default
SqlParseImpl
- Start parsing and generating . We can do some business-related verification and parameter analysis
AST
based on the generated ones.SqlNode
- Adapter gets data source
public class KafkaConsumerAdapter { public static List<KafkaResult> executor (KafkaSqlInfo kafkaSql) { Properties props = new Properties (); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props); List<TopicPartition> topics = new ArrayList <>(); for (Integer partition : kafkaSql.getPartition()) { TopicPartition tp = new TopicPartition (kafkaSql.getTableName(), partition); topics.add(tp); } consumer.assign(topics); for (TopicPartition tp : topics) { Map<TopicPartition, Long> offsets = consumer.endOffsets(Collections.singleton(tp)); long position = 500 ; if (offsets.get(tp).longValue() > position) { consumer.seek(tp, offsets.get(tp).longValue() - 500 ); } else { consumer.seek(tp, 0 ); } } List<KafkaResult> results = new ArrayList <>(); boolean flag = true ; while (flag) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis( 100 )); for (ConsumerRecord<String, String> record : records) { //Convert to the object collection I defined KafkaResult result = new KafkaResult (); result.setPartition(record.partition()); result.setOffset(record.offset()); result.setMsg(record.value()); result.setKey(record.key()); results.add(result); } if (!records.isEmpty()) { flag = false ; } } consumer.close(); return results; } }
- By executing the query, we can get the effect we want.
public class TestKafka { public static void main (String[] args) throws Exception { KafkaService kafkaService = new KafkaService (); //Put the parsed parameters in my own defined kafkaSqlInfo object KafkaSqlInfo sqlInfo = kafkaService.parseSql( "select * from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 " ); //The adapter obtains the data source, mainly polling data from the above sqlInfo object List<KafkaResult> results = KafkaConsumerAdapter.executor(sqlInfo); //Execute query query(sqlInfo.getTableName(), results, sqlInfo.getSql()); sqlInfo = kafkaService.parseSql( "select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like '%account%' limit 1000 " ); results = KafkaConsumerAdapter.executor(sqlInfo); query(sqlInfo.getTableName(), results, sqlInfo.getSql()); sqlInfo = kafkaService.parseSql( "select count(*) AS addad from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 " ); results = KafkaConsumerAdapter.executor(sqlInfo); query(sqlInfo.getTableName(), results, sqlInfo.getSql()); } private static void query (String tableName, List<KafkaResult> results, String sql) throws Exception { //Create model.json, set my SchemaFactory, set the library name String model = createTempJson(); //Set my table structure, table name, table field name and type KafkaTableSchema.generateSchema(tableName, results); Properties info = new Properties (); info.setProperty( "lex" , Lex.JAVA.toString()); Connection connection = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info); Statement st = connection.createStatement(); / /Execute ResultSet result = st.executeQuery(sql); ResultSetMetaData rsmd = result.getMetaData(); List<Map<String, Object>> ret = new ArrayList <>(); while (result.next()) { Map<String, Object> map = new LinkedHashMap <>(); for ( int i = 1 ; i <= rsmd.getColumnCount(); i++) { map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i))); } ret.add(map); } result.close(); st.close(); connection.close(); } private static void print (ResultSet resultSet) throws SQLException { final ResultSetMetaData metaData = resultSet.getMetaData(); final int columnCount = metaData.getColumnCount(); while (resultSet.next()) { for ( int i = 1 ; ; i++) { System.out.print(resultSet.getString(i)); if (i < columnCount) { System.out.print( ", " ); } else { System.out.println(); break ; } } } } private static String createTempJson () throws IOException { JSONObject object = new JSONObject (); object.put( "version" , "1.0" ); object.put( "defaultSchema" , "QAKAFKA" ); JSONArray array = new JSONArray (); JSONObject tmp = new JSONObject (); tmp.put( "name" , "QAKAFKA" ); tmp.put( "type" , "custom" ); tmp.put( "factory" , "kafka.KafkaSchemaFactory" ); array.add(tmp); object.put( "schemas" , array); return object.toJSONString(); } }
- Generate temporary
model.json
, previously based on files, now based ontext
strings,mode=inline
patterns - Set my table structure, table name, field name, field type, etc., and place them in memory, and also put the data queried by the adapter into
table
it. - Get the connection, execute the query, perfect!