Today I will teach you how to quickly implement a database with the help of a framework. This framework is Calcite. Below, I will teach you how to implement it quickly through two examples. One is to directly query the file content through SQL statements, and the second is to simulate Mysql. Query function, and finally tell you how to implement SQL query for Kafka data.
Contents
Calcite
CalciteIt is a pluggable basic framework (he is a framework) for optimizing query processing of heterogeneous data sources. It can convert any data (Any data, Anywhere) DML into a SQL-based DML engine, and we can selectively Use some of its features.
What can Calcite do?
- Use SQL to access certain data in memory
- Use SQL to access data from a file
- Data access, aggregation, sorting, etc. across data sources (such as joining data in Mysql and Redis data sources)
When we need to build a database by ourselves, the data can be in any format, such as text, word, xml, mysql, es, csv, third-party interface data, etc. We only have data, and we want these data to support dynamic SQL forms Add, delete, modify and check.
In addition, in projects such as Hive, Drill, Flink, Phoenix and Storm, data processing systems all use Calcite for SQL parsing and query optimization. Of course, some are used to build their own JDBC drivers.
Glossary
Token
It is to intercept the standard SQL (can be understood as Mysql) keywords and the strings between the keywords. Each one tokenwill be encapsulated into one SqlNode, and SqlNodemany subclasses will be derived. For example, it Selectwill be encapsulated as SqlSelect, and currently SqlNodeit can also be reversely parsed as SQL text.
RelDataTypeField
The name and type information of a field
RelDataType
Multiple RelDataTypeFields form RelDataType, which can be understood as data rows
Table
A complete table of information
Schema
The combination of all metadata can be understood as a set of Table or library concepts.
start using
1. Import package
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<!--The latest version is 2022-09- updated on 10th -->
<version> 1.32 .0 </version>
</dependency>
2. Create model.json file and table structure csv
model.json mainly describes or tells Calcitehow to create it Schema, that is, it tells the framework how to create a library.
{
"version" : "1.0" , //Ignore
"defaultSchema" : "CSV" , //Set the default schema
"schemas" : [ //Can define multiple schemas
{
"name" : "CSV" , // Equivalent The value of namespace and the above defaultSchema corresponds to
"type" : "custom" , //hard-coded
"factory" : "csv.CsvSchemaFactory" , //the class name of factory must be the full path of the factory package you implemented yourself
." operand" : { //You can pass custom parameters here, which will eventually be passed to the factory's operand parameter in the form of a map.
"directory" : "csv" //Directory represents that calcite will read all csv in the csv directory under resources. Files, the Schema created by the factory will build all these files into a Table, which can be understood as the root directory for reading data files. Of course, the name of the key does not have to be directory, you can specify it at will
}
}
]
}
Next, you need to define a csvfile to define the table structure.
NAME :string,MONEY:string aixiaoxian , 100 million xiaobai , 100 million adong , 100 million maomao , 100 million xixi , 100 million zizi , 100 million wuwu , 100 million kuku , 100 million
The structure of the entire project is roughly like this:

3. Factory class that implements Schema
Go to the package path specified in the above file to write CsvSchemaFactorya class, implement SchemaFactorythe interface, and implement the only method in it createto create Schemaa (library).
public class CsvSchemaFactory implements SchemaFactory {
/**
* parentSchema parent node, usually root
* name is the name defined in model.json
* operand is the data set in model.json. Custom parameters can be passed here.
*
* @param parentSchema Parent schema
* @param name Name of this schema
* @param operand The "operand" JSON property
* @return
*/
@Override
public Schema create (SchemaPlus parentSchema, String name,
Map<String, Object> operand) {
final String directory = (String) operand.get( "directory" );
File directoryFile = new File (directory);
return new CsvSchema (directoryFile, "scannable" );
}
}
4. Customize Schma class
Now that you have it SchemaFactory, you need to customize Schemathe class.
The custom interface Schemaneeds to be implemented Schema, but there are too many methods to be implemented directly. Let’s implement the official AbstractSchemaclass, so that only one method needs to be implemented (if there are other customized requirements, the native interface can be implemented).
The core logic is createTableMapthe method, which is used to create Tablethe table.
It will scan Resourceall the specified files below csv, map each file into Tablean object, and finally mapreturn it in the form. SchemaSeveral other methods of the interface will use this object.
//Just implement this method
@Override
protected Map<String, Table> getTableMap () {
if (tableMap == null ) {
tableMap = createTableMap();
}
return tableMap;
}
private Map<String, Table> createTableMap () {
// Look for files in the directory ending in ".csv"
final Source baseSource = Sources.of(directoryFile);
// Files with non-specified file suffixes will be automatically filtered out. csv written here
File[] files = directoryFile.listFiles((dir, name) -> {
final String nameSansGz = trim(name, ".gz" );
return nameSansGz.endsWith( ".csv" );
});
if (files == null ) {
System.out.println( "directory " + directoryFile + " not found" );
files = new File [ 0 ];
}
// Build a map from table name to table; each file becomes a table.
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
for (File file : files) {
Source source = Sources.of(file );
final Source sourceSansCsv = source.trimOrNull( ".csv" );
if (sourceSansCsv != null ) {
final Table table = createTable(source);
builder.put(sourceSansCsv.relative(baseSource).path(), table);
}
}
return builder.build();
}
5. Customize Table
SchemaYes, and the data file csvis also mapped to Table, one csv file corresponds to one Table.
Next we go to customization Table. The core of customization Tableis that we need to define the type and name of the field, and how to read csvthe file.
- First get the data type and name, that is, a single table structure, from
csvthe file header (the current file header needs to be defined by ourselves, including the rules, which we can also customize).
/**
* Base class for table that reads CSV files.
*/
public abstract class CsvTable extends AbstractTable {
protected final Source source;
protected final @Nullable RelProtoDataType protoRowType;
private @Nullable RelDataType rowType;
private @Nullable List<RelDataType> fieldTypes;
/**
* Creates a CsvTable.
*/
CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {
this .source = source;
this .protoRowType = protoRowType;
}
/**
* To create a CsvTable and inherit AbstractTable, you need to implement the getRowType method inside. This method is to obtain the current table structure.
There are many types of Table, such as view types. The AbstractTable class helps us implement some methods of the Table interface by default, such as the getJdbcTableType method, which defaults to the Table type. If there are other customized requirements, the Table interface can be implemented directly.
Very similar to AbstractSchema
*/
@Override
public RelDataType getRowType (RelDataTypeFactory typeFactory) {
if (protoRowType != null ) {
return protoRowType.apply(typeFactory);
}
if (rowType == null ) {
rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
null );
}
return rowType;
}
/**
* Returns the field types of this CSV table.
*/
public List<RelDataType> getFieldTypes (RelDataTypeFactory typeFactory) {
if (fieldTypes == null ) {
fieldTypes = new ArrayList <>();
CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
fieldTypes);
}
return fieldTypes;
}
public static RelDataType deduceRowType (JavaTypeFactory typeFactory,
Source source, @Nullable List<RelDataType> fieldTypes) {
final List<RelDataType> types = new ArrayList <>();
final List<String> names = new ArrayList <>();
try ( CSVReader reader = openCsv(source)) {
String[] strings = reader.readNext();
if (strings == null ) {
strings = new String []{ "EmptyFileHasNoColumns:boolean" };
}
for (String string : strings) {
final String name;
final RelDataType fieldType;
//Simply read the string. The name is in front of the colon, and the type is after the colon.
final int colon = string.indexOf( ':' );
if (colon >= 0 ) {
name = string.substring( 0 , colon);
String typeString = string.substring(colon + 1 );
Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString);
if (decimalMatcher.matches()) {
int precision = Integer.parseInt(decimalMatcher .group( 1 ));
int scale = Integer.parseInt(decimalMatcher.group( 2 ));
fieldType = parseDecimalSqlType(typeFactory, precision, scale);
} else {
switch (typeString) {
case "string" :
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
break ;
case "boolean" :
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);
break ;
case "byte" :
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);
break ;
case "char" :
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);
break ;
case "short" :
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);
break ;
case "int" :
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);
break ;
case "long" :
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);
break ;
case "float" :
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);
break ;
case "double" :
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);
break ;
case "date" :
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);
break ;
case "timestamp" :
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);
break ;
case "time" :
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);
break ;
default :
LOGGER.warn(
"Found unknown type: {} in file: {} for column: {}. Will assume the type of "
+ "column is string." ,
typeString, source.path(), name);
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
break ;
}
}
} else {
// If not defined, the default is String type, and the field name is also string
name = string;
fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
}
names.add(name);
types.add(fieldType);
if (fieldTypes != null ) {
fieldTypes.add(fieldType);
}
}
} catch (IOException e) {
// ignore
}
if (names.isEmpty()) {
names.add( "line" );
types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
}
return typeFactory.createStructType(Pair.zip(names, types));
}
}
- Obtain the data in the file.
TableAfter obtaining the field names and types of the table structure above, the last step is to obtain the data in the file. We need to customize a class, implementScannableTablethe interface, and implement the only method in itscan. The method is essentially to read the file, and thenfileTypematch the data of each line of the file with the obtained above.
@Override
public Enumerable<Object[]> scan(DataContext root) {
JavaTypeFactory typeFactory = root.getTypeFactory();
final List<RelDataType> fieldTypes = getFieldTypes(typeFactory);
final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size ());
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable < @Nullable Object[]>() {
@Override
public Enumerator< @Nullable Object[]> enumerator() {
//return Our custom class for reading data
return new CsvEnumerator <>(source, cancelFlag, false , null ,
CsvEnumerator.arrayConverter(fieldTypes, fields, false ));
}
};
}
public CsvEnumerator (Source source, AtomicBoolean cancelFlag, boolean stream,
@Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) {
this .cancelFlag = cancelFlag;
this .rowConverter = rowConverter;
this .filterValues = filterValues == null ? null
: ImmutableNullableList.copyOf(filterValues);
try {
this .reader = openCsv(source);
//Skip the first line because the first line is where the type and name are defined
this .reader.readNext(); // skip header row
} catch (IOException e) {
throw new RuntimeException (e);
}
}
//CsvEnumerator must implement calcit's own iterator, which contains current and moveNext methods. Current returns the data record where the current cursor is located. MoveNext points the cursor to the next record. The official website defines a type converter, which converts csv The data in the file is converted into the type specified by the file header. This needs to be implemented by ourselves
@Override
public E current () {
return castNonNull(current);
}
@Override
public boolean moveNext () {
try {
outer:
for (; ; ) {
if (cancelFlag.get()) {
return false ;
}
final String[] strings = reader.readNext();
if (strings == null ) {
current = null ;
reader.close();
return false ;
}
if (filterValues != null ) {
for ( int i = 0 ; i < strings.length; i++) {
String filterValue = filterValues.get(i);
if (filterValue != null ) {
if (!filterValue.equals(strings [i])) {
continue outer;
}
}
}
}
current = rowConverter.convertRow(strings);
return true ;
}
} catch (IOException e) {
throw new RuntimeException (e);
}
}
protected @Nullable Object convert ( @Nullable RelDataType fieldType, @Nullable String string) {
if (fieldType == null || string == null ) {
return string;
}
switch (fieldType.getSqlTypeName()) {
case BOOLEAN:
if (string.length() == 0 ) {
return null ;
}
return Boolean.parseBoolean(string);
case TINYINT:
if (string.length() == 0 ) {
return null ;
}
return Byte.parseByte(string);
case SMALLINT:
if (string.length() == 0 ) {
return null ;
}
return Short.parseShort(string);
case INTEGER:
if (string.length() == 0 ) {
return null ;
}
return Integer.parseInt(string);
case BIGINT:
if (string.length() == 0 ) {
return null ;
}
return Long.parseLong(string);
case FLOAT:
if (string.length() == 0 ) {
return null ;
}
return Float.parseFloat(string);
case DOUBLE:
if (string.length() == 0 ) {
return null ;
}
return Double.parseDouble(string);
case DECIMAL:
if (string.length() == 0 ) {
return null ;
}
return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string);
case DATE:
if (string.length() == 0 ) {
return null ;
}
try {
Date date = TIME_FORMAT_DATE.parse(string);
return ( int ) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);
} catch (ParseException e) {
return null ;
}
case TIME:
if (string.length() == 0 ) {
return null ;
}
try {
Date date = TIME_FORMAT_TIME.parse(string);
return ( int ) date.getTime();
} catch (ParseException e) {
return null ;
}
case TIMESTAMP:
if (string.length() == 0 ) {
return null ;
}
try {
Date date = TIME_FORMAT_TIMESTAMP.parse(string);
return date.getTime();
} catch (ParseException e) {
return null ;
}
case VARCHAR:
default :
return string;
}
}
6. Finally
At this point, we have everything we need to prepare: library, table name, field name, and field type. Next, we write our SQL statement to query our data file.
Create several test data files. For example, in the project structure above, I created 2 csv files USERINFO.csv, ASSET.csvand then created a test class.
After running in this way, you can directly query the data through SQL statements.
public class Test {
public static void main (String[] args) throws SQLException {
Connection connection = null ;
Statement statement = null ;
try {
Properties info = new Properties ();
info.put( "model" , Sources.of(Test.class.getResource( "/model.json" )).file().getAbsolutePath());
connection = DriverManager.getConnection( "jdbc:calcite:" , info);
statement = connection.createStatement();
print(statement.executeQuery( "select * from asset " ));
print(statement.executeQuery( " select * from userinfo " ));
print(statement.executeQuery( " select age from userinfo where name ='aixiaoxian' " ));
print(statement.executeQuery( " select * from userinfo where age >60 " ));
print(statement.executeQuery( " select * from userinfo where name like 'a%' " ));
} finally {
connection.close();
}
}
private static void print (ResultSet resultSet) throws SQLException {
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for ( int i = 1 ; ; i++) {
System.out.print(resultSet.getString(i));
if (i < columnCount) {
System.out.print( ", " );
} else {
System.out.println();
break ;
}
}
}
}
}
search result:

There are two pitfalls encountered during testing. You can avoid them if you experiment by yourself.
CalciteBy default, all table names and class names in your SQL statements will be converted to uppercase, because the name of the default csv (same for other files) file is the table name, unless you customize the rules, so your file name must be written in uppercase.CalciteThere are some default keywords that cannot be used as table names, otherwise the query will fail. For example, Iuser.csvcan’t find the ones I set at the beginning.USERINFOI can change them. This isMysqlsimilar to the built-in keywords. It can also be configured through personalized configuration. Go and change.
DemoMysql
- First, prepare
Calcitethe necessary things: library, table name, field name, field type.
If the data source is used Mysql, we do not need to define these in the JAVA service. They are created directly in the Mysql client. Here, two tables are directly created for testing, just like our csvfiles.
CREATE TABLE `USERINFO1` ( `NAME` varchar ( 255 ) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL , `AGE` int DEFAULT NULL ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb3; CREATE TABLE `ASSET` ( `NAME` varchar ( 255 ) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL , `MONEY` varchar ( 255 ) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb3;
csvIn the above caseSchemaFactoryandSchemathese do not need to be created, becauseCalcitethe Mysql Adapter adapter is provided by default.- In fact, there is no need to do the above two steps. What we really need to do is to tell
Calciteyou the JDBC connection information, which is alsomodel.jsondefined in the file.
{
"version" : "1.0" ,
"defaultSchema" : "Demo" ,
"schemas" : [
{
"name" : "Demo" ,
"type" : "custom" ,
// This is calcite's default SchemaFactory, inside The process is the same as what we defined above. Let’s take a brief look at the source code.
"factory" : "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory" ,
"operand" : {
// I am using mysql8 or above, so pay attention to the package name here
"jdbcDriver" : "com.mysql. cj.jdbc.Driver" ,
"jdbcUrl" : "jdbc:mysql://localhost:3306/irving" ,
"jdbcUser" : "root" ,
"jdbcPassword" : "123456"
}
}
]
}
- Introduce the Mysql driver package into the project
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version> 8.0 .30 </version> </dependency>
- Write the test class, which is equivalent to completing all the functions.
public class TestMysql {
public static void main (String[] args) throws SQLException {
Connection connection = null ;
Statement statement = null ;
try {
Properties info = new Properties ();
info.put( "model" , Sources.of(TestMysql.class.getResource( "/mysqlmodel.json" )).file().getAbsolutePath());
connection = DriverManager.getConnection( "jdbc:calcite:" , info);
statement = connection.createStatement();
statement.executeUpdate( " insert into userinfo1 values ('xxx',12) " );
print(statement.executeQuery( "select * from asset " ));
print(statement.executeQuery( " select * from userinfo1 " ));
print(statement.executeQuery( " select age from userinfo1 where name ='aixiaoxian' " ));
print(statement.executeQuery( " select * from userinfo1 where age >60 " ));
print(statement.executeQuery( " select * from userinfo1 where name like 'a%' " ));
} finally {
connection.close();
}
}
private static void print (ResultSet resultSet) throws SQLException {
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for ( int i = 1 ; ; i++) {
System.out.print(resultSet.getString(i));
if (i < columnCount) {
System.out.print( ", " );
} else {
System.out.println();
break ;
}
}
}
}
}
search result:

Mysql implementation principle
Above we model.jsonspecified org.apache.calcite.adapter.jdbc.JdbcSchema$Factorythe class in the file, you can look at the code of this class.
This class writes Factoryand Schematogether, which actually means calling the schemafactoryclass createmethod to create a schema, which is the same as the process we customized above.
The JdbcSchemaclass is also Schemaa subclass of , so it will also implement getTablemethods (we also implemented this above. We were getting the table structure and field types and names of the table, and reading the file from the csv file header). JdbcSchemaThe implementation is through the connection The Mysql server queries the metadata information and then encapsulates the information into Calcitethe required object format.
Here we also need to pay attention csvto two points of attention, capitalization and keyword issues.
public static JdbcSchema create (
SchemaPlus parentSchema,
String name,
Map<String, Object> operand) {
DataSource dataSource;
try {
final String dataSourceName = (String) operand.get( "dataSource" );
if (dataSourceName != null ) {
dataSource =
AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
} else {
//Come here, here is the jdbc connection information we specified in model.json, and finally create a datasource
final String jdbcUrl = (String) requireNonNull(operand.get( "jdbcUrl" ), " jdbcUrl" );
final String jdbcDriver = (String) operand.get( "jdbcDriver" );
final String jdbcUser = (String) operand.get( "jdbcUser" );
final String jdbcPassword = (String) operand.get( "jdbcPassword" );
dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
}
} catch (Exception e) {
throw new RuntimeException ( "Error while reading dataSource" , e);
}
String jdbcCatalog = (String) operand.get( "jdbcCatalog" );
String jdbcSchema = (String) operand.get( "jdbcSchema" );
String sqlDialectFactory = (String) operand.get( "sqlDialectFactory" );
if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
return JdbcSchema.create(
parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
} else {
SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(
SqlDialectFactory.class, sqlDialectFactory);
return JdbcSchema.create(
parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
}
}
@Override public @Nullable Table getTable (String name) {
return getTableMap( false ).get(name);
}
private synchronized ImmutableMap<String, JdbcTable> getTableMap (
boolean force) {
if (force || tableMap == null ) {
tableMap = computeTables();
}
return tableMap;
}
private ImmutableMap<String, JdbcTable> computeTables () {
Connection connection = null ;
ResultSet resultSet = null ;
try {
connection = dataSource.getConnection();
final Pair< @Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);
final String catalog = catalogSchema.left;
final String schema = catalogSchema.right;
final Iterable<MetaImpl.MetaTable> tableDefs;
Foo threadMetadata = THREAD_METADATA.get( );
if (threadMetadata != null ) {
tableDefs = threadMetadata.apply(catalog, schema);
} else {
final List<MetaImpl.MetaTable> tableDefList = new ArrayList <>();
// Get metadata
final DatabaseMetaData metaData = connection.getMetaData();
resultSet = metaData.getTables(catalog, schema, null , null );
while (resultSet.next()) {
//Get the library name, display and other information
final String catalogName = resultSet.getString( 1 );
final String schemaName = resultSet. getString( 2 );
final String tableName = resultSet.getString( 3 );
final String tableTypeName = resultSet.getString( 4 );
tableDefList.add(
new MetaImpl .MetaTable(catalogName, schemaName, tableName,
tableTypeName));
}
tableDefs = tableDefList;
}
final ImmutableMap.Builder<String, JdbcTable> builder =
ImmutableMap.builder();
for (MetaImpl.MetaTable tableDef : tableDefs) {
final String tableTypeName2 =
tableDef.tableType == null
? null
: tableDef.tableType.toUpperCase(Locale.ROOT).replace( ' ' , '_' );
final TableType tableType =
Util.enumVal(TableType.OTHER, tableTypeName2);
if (tableType == TableType.OTHER && tableTypeName2 != null ) {
System.out.println( "Unknown table type: " + tableTypeName2);
}
//Finally encapsulated into a JdbcTable object
final JdbcTable table =
new JdbcTable ( this , tableDef.tableCat, tableDef.tableSchem,
tableDef.tableName, tableType);
builder.put(tableDef.tableName, table);
}
return builder.build();
} catch (SQLException e) {
throw new RuntimeException (
"Exception while reading tables" , e);
} finally {
close(connection, null , resultSet);
}
}
SQL execution process
OK, basically two simple cases have been demonstrated here. Finally, I will add the entire Calcitearchitecture and the entire SQL execution process.

The entire process is as follows: SQL parsing (Parser) => SQL verification (Validator) => SQL query optimization (optimizer) => SQL generation => SQL execution
SQL Parser
All SQL statements need to be parsed by a SQL parser before execution. The work of the parser is to parse the Token in SQL into an abstract syntax tree. The node of each tree is a SqlNode. This process is actually Sql Text => SqlNode process.
Our previous Demo did not have a custom Parser because Calcite uses its own default Parser (SqlParserImpl).


SqlNode
SqlNodeis the core of the entire analysis. For example, you can find in the picture that for each content such as select, from, and wherekeywords are actually one SqlNode.

parserConfigThe main method is to set the parameters of SqlParserFactory. For example, as we mentioned above, the pitfalls of capitalization and capitalization that I encountered during local testing can be set here.
If you call it directly setCaseSensitive=false, the table name and column name in the SQL statement will not be converted to uppercase. The following is the default, and other parameters can be configured as needed.

SQL Validator
The SQL statement first passes through Parser and then through the syntax validator. Note that Parser does not verify the correctness of the syntax.

The real verification is in validator, which will verify whether the queried table name exists, whether the queried field exists, and whether the types match. This process is more complicated, and the default validatoris SqlValidatorImpl.
Query optimization
For example, relational algebra, such as projections and Cartesian products, Calciteprovide many internal optimizers, and you can also implement your own optimizers.
adapter
CalciteIt does not include a storage layer, so it provides an adapter mechanism to access external data storage or storage engines.
Finally, advanced
The official website states that in the future , Kafkathe adapter will be supported in the public domain Api, and it will be as convenient to use as the above integration Mysql. However, it is not supported yet. I will provide you with a way to implement it yourself, so that you can directly query Kafka through SQL. Topic data and other information in the.
Here we have internally integrated and implemented KSQLcapabilities, and the query results are OK.

Still like the above steps, we need to prepare the library, table name, field name, field type, and data source (extra places).
- Custom
Sqlparsing. We didn’t have custom parsing before. We need custom parsing here because I need to dynamically parsesqlthewhereconditionspartation.
- Configuring the parser is the configuration case mentioned in the previous case.
- Create a parser, using the default
SqlParseImpl - Start parsing and generating . We can do some business-related verification and parameter analysis
ASTbased on the generated ones.SqlNode
- Adapter gets data source
public class KafkaConsumerAdapter {
public static List<KafkaResult> executor (KafkaSqlInfo kafkaSql) {
Properties props = new Properties ();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" );
KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props);
List<TopicPartition> topics = new ArrayList <>();
for (Integer partition : kafkaSql.getPartition()) {
TopicPartition tp = new TopicPartition (kafkaSql.getTableName(), partition);
topics.add(tp);
}
consumer.assign(topics);
for (TopicPartition tp : topics) {
Map<TopicPartition, Long> offsets = consumer.endOffsets(Collections.singleton(tp));
long position = 500 ;
if (offsets.get(tp).longValue() > position) {
consumer.seek(tp, offsets.get(tp).longValue() - 500 );
} else {
consumer.seek(tp, 0 );
}
}
List<KafkaResult> results = new ArrayList <>();
boolean flag = true ;
while (flag) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis( 100 ));
for (ConsumerRecord<String, String> record : records) {
//Convert to the object collection I defined
KafkaResult result = new KafkaResult ();
result.setPartition(record.partition());
result.setOffset(record.offset());
result.setMsg(record.value());
result.setKey(record.key());
results.add(result);
}
if (!records.isEmpty()) {
flag = false ;
}
}
consumer.close();
return results;
}
}
- By executing the query, we can get the effect we want.
public class TestKafka {
public static void main (String[] args) throws Exception {
KafkaService kafkaService = new KafkaService ();
//Put the parsed parameters in my own defined kafkaSqlInfo object
KafkaSqlInfo sqlInfo = kafkaService.parseSql( "select * from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 " );
//The adapter obtains the data source, mainly polling data from the above sqlInfo object
List<KafkaResult> results = KafkaConsumerAdapter.executor(sqlInfo);
//Execute query
query(sqlInfo.getTableName(), results, sqlInfo.getSql());
sqlInfo = kafkaService.parseSql( "select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like '%account%' limit 1000 " );
results = KafkaConsumerAdapter.executor(sqlInfo);
query(sqlInfo.getTableName(), results, sqlInfo.getSql());
sqlInfo = kafkaService.parseSql( "select count(*) AS addad from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 " );
results = KafkaConsumerAdapter.executor(sqlInfo);
query(sqlInfo.getTableName(), results, sqlInfo.getSql());
}
private static void query (String tableName, List<KafkaResult> results,
String sql) throws Exception {
//Create model.json, set my SchemaFactory, set the library name
String model = createTempJson();
//Set my table structure, table name, table field name and type
KafkaTableSchema.generateSchema(tableName, results);
Properties info = new Properties ();
info.setProperty( "lex" , Lex.JAVA.toString());
Connection connection = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info);
Statement st = connection.createStatement();
/ /Execute
ResultSet result = st.executeQuery(sql);
ResultSetMetaData rsmd = result.getMetaData();
List<Map<String, Object>> ret = new ArrayList <>();
while (result.next()) {
Map<String, Object> map = new LinkedHashMap <>();
for ( int i = 1 ; i <= rsmd.getColumnCount(); i++) {
map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
}
ret.add(map);
}
result.close();
st.close();
connection.close();
}
private static void print (ResultSet resultSet) throws SQLException {
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for ( int i = 1 ; ; i++) {
System.out.print(resultSet.getString(i));
if (i < columnCount) {
System.out.print( ", " );
} else {
System.out.println();
break ;
}
}
}
}
private static String createTempJson () throws IOException {
JSONObject object = new JSONObject ();
object.put( "version" , "1.0" );
object.put( "defaultSchema" , "QAKAFKA" );
JSONArray array = new JSONArray ();
JSONObject tmp = new JSONObject ();
tmp.put( "name" , "QAKAFKA" );
tmp.put( "type" , "custom" );
tmp.put( "factory" , "kafka.KafkaSchemaFactory" );
array.add(tmp);
object.put( "schemas" , array);
return object.toJSONString();
}
}
- Generate temporary
model.json, previously based on files, now based ontextstrings,mode=inlinepatterns - Set my table structure, table name, field name, field type, etc., and place them in memory, and also put the data queried by the adapter into
tableit. - Get the connection, execute the query, perfect!
