Teach you how to write a database in 10 minutes

Today I will teach you how to quickly implement a database with the help of a framework. This framework is Calcite. Below, I will teach you how to implement it quickly through two examples. One is to directly query the file content through SQL statements, and the second is to simulate Mysql. Query function, and finally tell you how to implement SQL query for Kafka data.


CalciteIt is a pluggable basic framework (he is a framework) for optimizing query processing of heterogeneous data sources. It can convert any data (Any data, Anywhere) DML into a SQL-based DML engine, and we can selectively Use some of its features.

What can Calcite do?

  1. Use SQL to access certain data in memory
  2. Use SQL to access data from a file
  3. Data access, aggregation, sorting, etc. across data sources (such as joining data in Mysql and Redis data sources)

When we need to build a database by ourselves, the data can be in any format, such as text, word, xml, mysql, es, csv, third-party interface data, etc. We only have data, and we want these data to support dynamic SQL forms Add, delete, modify and check.

In addition, in projects such as Hive, Drill, Flink, Phoenix and Storm, data processing systems all use Calcite for SQL parsing and query optimization. Of course, some are used to build their own JDBC drivers.



It is to intercept the standard SQL (can be understood as Mysql) keywords and the strings between the keywords. Each one tokenwill be encapsulated into one SqlNode, and SqlNodemany subclasses will be derived. For example, it Selectwill be encapsulated as SqlSelect, and currently SqlNodeit can also be reversely parsed as SQL text.


The name and type information of a field


Multiple RelDataTypeFields form RelDataType, which can be understood as data rows


A complete table of information


The combination of all metadata can be understood as a set of Table or library concepts.

start using

1. Import package

    <!--The latest version is 2022-09- updated on 10th -->
    <version> 1.32 .0 </version>

2. Create model.json file and table structure csv

model.json mainly describes or tells Calcitehow to create it Schema, that is, it tells the framework how to create a library.

"version" :  "1.0" , //Ignore 
"defaultSchema" :  "CSV" , //Set the default schema 
"schemas" :  [ //Can define multiple schemas 
          "name" :  "CSV" , // Equivalent The value of namespace and the above defaultSchema corresponds to 
          "type" :  "custom" , //hard-coded 
          "factory" :  "csv.CsvSchemaFactory" , //the class name of factory must be the full path of the factory package you implemented yourself 
          ." operand" :  {  //You can pass custom parameters here, which will eventually be passed to the factory's operand parameter in the form of a map. 
          "directory" :  "csv" //Directory represents that calcite will read all csv in the csv directory under resources. Files, the Schema created by the factory will build all these files into a Table, which can be understood as the root directory for reading data files. Of course, the name of the key does not have to be directory, you can specify it at will 

Next, you need to define a csvfile to define the table structure.

NAME :string,MONEY:string
 aixiaoxian , 100 million
 xiaobai , 100 million
 adong , 100 million
 maomao , 100 million
 xixi , 100 million
 zizi , 100 million
 wuwu , 100 million
 kuku , 100 million

The structure of the entire project is roughly like this:

3. Factory class that implements Schema

Go to the package path specified in the above file to write CsvSchemaFactorya class, implement SchemaFactorythe interface, and implement the only method in it createto create Schemaa (library).

public  class  CsvSchemaFactory  implements  SchemaFactory {
     * parentSchema parent node, usually root
     * name is the name defined in model.json
     * operand is the data set in model.json. Custom parameters can be passed here.
     * @param parentSchema Parent schema
     * @param name Name of this schema
     * @param operand The "operand" JSON property
     * @return 
    public Schema create (SchemaPlus parentSchema, String name,
                         Map<String, Object> operand) {
         final  String  directory  = (String) operand.get( "directory" );
         File  directoryFile  =  new  File (directory);
         return  new  CsvSchema (directoryFile, "scannable" );

4. Customize Schma class

Now that you have it SchemaFactory, you need to customize Schemathe class.

The custom interface Schemaneeds to be implemented Schema, but there are too many methods to be implemented directly. Let’s implement the official AbstractSchemaclass, so that only one method needs to be implemented (if there are other customized requirements, the native interface can be implemented).

The core logic is createTableMapthe method, which is used to create Tablethe table.

It will scan Resourceall the specified files below csv, map each file into Tablean object, and finally mapreturn it in the form. SchemaSeveral other methods of the interface will use this object.

        //Just implement this method 
    protected Map<String, Table> getTableMap () {
         if (tableMap == null ) {
            tableMap = createTableMap();
        return tableMap;
        private Map<String, Table> createTableMap () {
         // Look for files in the directory ending in ".csv" 
        final  Source  baseSource  = Sources.of(directoryFile);
         // Files with non-specified file suffixes will be automatically filtered out. csv written here
        File[] files = directoryFile.listFiles((dir, name) -> {
            final  String  nameSansGz  = trim(name, ".gz" );
             return nameSansGz.endsWith( ".csv" );
        if (files == null ) {
            System.out.println( "directory " + directoryFile + " not found" );
            files = new  File [ 0 ];
        // Build a map from table name to table; each file becomes a table. 
        final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
         for (File file : files) {
             Source  source  = Sources.of(file );
             final  Source  sourceSansCsv  = source.trimOrNull( ".csv" );
             if (sourceSansCsv != null ) {
                 final  Table  table  = createTable(source);
                builder.put(sourceSansCsv.relative(baseSource).path(), table);
        return builder.build();

5. Customize Table

SchemaYes, and the data file csvis also mapped to Table, one csv file corresponds to one Table.

Next we go to customization Table. The core of customization Tableis that we need to define the type and name of the field, and how to read csvthe file.

  1. First get the data type and name, that is, a single table structure, from csvthe file header (the current file header needs to be defined by ourselves, including the rules, which we can also customize).
 * Base class for table that reads CSV files.
public  abstract  class  CsvTable  extends  AbstractTable {
     protected  final Source source;
     protected  final  @Nullable RelProtoDataType protoRowType;
     private  @Nullable RelDataType rowType;
     private  @Nullable List<RelDataType> fieldTypes;
     * Creates a CsvTable.
    CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {
         this .source = source;
         this .protoRowType = protoRowType;
        * To create a CsvTable and inherit AbstractTable, you need to implement the getRowType method inside. This method is to obtain the current table structure.
            There are many types of Table, such as view types. The AbstractTable class helps us implement some methods of the Table interface by default, such as the getJdbcTableType method, which defaults to the Table type. If there are other customized requirements, the Table interface can be implemented directly.
            Very similar to AbstractSchema
    public RelDataType getRowType (RelDataTypeFactory typeFactory) {
         if (protoRowType != null ) {
             return protoRowType.apply(typeFactory);
        if (rowType == null ) {
            rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
                    null );
        return rowType;
     * Returns the field types of this CSV table.
    public List<RelDataType> getFieldTypes (RelDataTypeFactory typeFactory) {
         if (fieldTypes == null ) {
            fieldTypes = new  ArrayList <>();
            CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
        return fieldTypes;
   public  static RelDataType deduceRowType (JavaTypeFactory typeFactory,
                                            Source source, @Nullable List<RelDataType> fieldTypes) {
         final List<RelDataType> types = new  ArrayList <>();
         final List<String> names = new  ArrayList <>();
         try ( CSVReader  reader  = openCsv(source)) {
            String[] strings = reader.readNext();
            if (strings == null ) {
                strings = new  String []{ "EmptyFileHasNoColumns:boolean" };
            for (String string : strings) {
                 final String name;
                 final RelDataType fieldType;
                 //Simply read the string. The name is in front of the colon, and the type is after the colon. 
                final  int  colon  = string.indexOf( ':' );
                 if (colon >= 0 ) {
                    name = string.substring( 0 , colon);
                     String  typeString  = string.substring(colon + 1 );
                     Matcher  decimalMatcher  = DECIMAL_TYPE_PATTERN.matcher(typeString);
                     if (decimalMatcher.matches()) {
                         int  precision  = Integer.parseInt(decimalMatcher .group( 1 ));
                         int  scale  = Integer.parseInt(decimalMatcher.group( 2 ));
                        fieldType = parseDecimalSqlType(typeFactory, precision, scale);
                    } else {
                         switch (typeString) {
                             case  "string" :
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
                                break ;
                             case  "boolean" :
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);
                                break ;
                             case  "byte" :
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);
                                break ;
                             case  "char" :
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);
                                break ;
                             case  "short" :
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);
                                break ;
                             case  "int" :
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);
                                break ;
                             case  "long" :
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);
                                break ;
                             case  "float" :
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);
                                break ;
                             case  "double" :
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);
                                break ;
                             case  "date" :
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);
                                break ;
                             case  "timestamp" :
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);
                                break ;
                             case  "time" :
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);
                                break ;
                             default :
                                        "Found unknown type: {} in file: {} for column: {}. Will assume the type of " 
                                                + "column is string." ,
                                        typeString, source.path(), name);
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
                                break ;
                } else {
                     // If not defined, the default is String type, and the field name is also string
                    name = string;
                    fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
                if (fieldTypes != null ) {
        } catch (IOException e) {
             // ignore
        if (names.isEmpty()) {
            names.add( "line" );
        return typeFactory.createStructType(Pair.zip(names, types));
  1. Obtain the data in the file. TableAfter obtaining the field names and types of the table structure above, the last step is to obtain the data in the file. We need to customize a class, implement ScannableTablethe interface, and implement the only method in it scan. The method is essentially to read the file, and then fileTypematch the data of each line of the file with the obtained above.
    public Enumerable<Object[]> scan(DataContext root) {
         JavaTypeFactory  typeFactory  = root.getTypeFactory();
         final List<RelDataType> fieldTypes = getFieldTypes(typeFactory);
         final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size ());
         final  AtomicBoolean  cancelFlag  = DataContext.Variable.CANCEL_FLAG.get(root);
         return  new  AbstractEnumerable < @Nullable Object[]>() {
            public Enumerator< @Nullable Object[]> enumerator() {
                 //return Our custom class for reading data 
                return  new  CsvEnumerator <>(source, cancelFlag, false , null ,
                        CsvEnumerator.arrayConverter(fieldTypes, fields, false ));
 public  CsvEnumerator (Source source, AtomicBoolean cancelFlag, boolean stream,
                          @Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) {
         this .cancelFlag = cancelFlag;
         this .rowConverter = rowConverter;
         this .filterValues ​​= filterValues ​​== null ? null
                : ImmutableNullableList.copyOf(filterValues);
        try {
            this .reader = openCsv(source);
             //Skip the first line because the first line is where the type and name are defined 
            this .reader.readNext(); // skip header row 
        } catch (IOException e) {
             throw  new  RuntimeException (e);
//CsvEnumerator must implement calcit's own iterator, which contains current and moveNext methods. Current returns the data record where the current cursor is located. MoveNext points the cursor to the next record. The official website defines a type converter, which converts csv The data in the file is converted into the type specified by the file header. This needs to be implemented by ourselves 
    public E current () {
         return castNonNull(current);
    public  boolean  moveNext () {
         try {
            for (; ; ) {
                 if (cancelFlag.get()) {
                     return  false ;
                final String[] strings = reader.readNext();
                 if (strings == null ) {
                    current = null ;
                    return  false ;
                if (filterValues ​​!= null ) {
                     for ( int  i  =  0 ; i < strings.length; i++) {
                         String  filterValue  = filterValues.get(i);
                         if (filterValue != null ) {
                             if (!filterValue.equals(strings [i])) {
                                 continue outer;
                current = rowConverter.convertRow(strings);
                return  true ;
        } catch (IOException e) {
             throw  new  RuntimeException (e);
        protected  @Nullable Object convert ( @Nullable RelDataType fieldType, @Nullable String string) {
             if (fieldType == null || string == null ) {
                 return string;
            switch (fieldType.getSqlTypeName()) {
                 case BOOLEAN:
                     if (string.length() == 0 ) {
                         return  null ;
                    return Boolean.parseBoolean(string);
                 case TINYINT:
                     if (string.length() == 0 ) {
                         return  null ;
                    return Byte.parseByte(string);
                 case SMALLINT:
                     if (string.length() == 0 ) {
                         return  null ;
                    return Short.parseShort(string);
                 case INTEGER:
                     if (string.length() == 0 ) {
                         return  null ;
                    return Integer.parseInt(string);
                 case BIGINT:
                     if (string.length() == 0 ) {
                         return  null ;
                    return Long.parseLong(string);
                 case FLOAT:
                     if (string.length() == 0 ) {
                         return  null ;
                    return Float.parseFloat(string);
                 case DOUBLE:
                     if (string.length() == 0 ) {
                         return  null ;
                    return Double.parseDouble(string);
                 case DECIMAL:
                     if (string.length() == 0 ) {
                         return  null ;
                    return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string);
                 case DATE:
                     if (string.length() == 0 ) {
                         return  null ;
                    try {
                         Date  date  = TIME_FORMAT_DATE.parse(string);
                         return ( int ) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);
                    } catch (ParseException e) {
                         return  null ;
                case TIME:
                     if (string.length() == 0 ) {
                         return  null ;
                    try {
                         Date  date  = TIME_FORMAT_TIME.parse(string);
                         return ( int ) date.getTime();
                    } catch (ParseException e) {
                         return  null ;
                case TIMESTAMP:
                     if (string.length() == 0 ) {
                         return  null ;
                    try {
                         Date  date  = TIME_FORMAT_TIMESTAMP.parse(string);
                         return date.getTime();
                    } catch (ParseException e) {
                         return  null ;
                case VARCHAR:
                 default :
                     return string;

6. Finally

At this point, we have everything we need to prepare: library, table name, field name, and field type. Next, we write our SQL statement to query our data file.

Create several test data files. For example, in the project structure above, I created 2 csv files USERINFO.csvASSET.csvand then created a test class.

After running in this way, you can directly query the data through SQL statements.

public  class  Test {
     public  static  void  main (String[] args)  throws SQLException {
         Connection  connection  =  null ;
         Statement  statement  =  null ;
         try {
             Properties  info  =  new  Properties ();
            info.put( "model" , Sources.of(Test.class.getResource( "/model.json" )).file().getAbsolutePath());
            connection = DriverManager.getConnection( "jdbc:calcite:" , info);
            statement = connection.createStatement();
            print(statement.executeQuery( "select * from asset " ));
            print(statement.executeQuery( " select * from userinfo " ));
            print(statement.executeQuery( " select age from userinfo where name ='aixiaoxian' " ));
            print(statement.executeQuery( " select * from userinfo where age >60 " ));
            print(statement.executeQuery( " select * from userinfo where name like 'a%' " ));
        } finally {
    private  static  void  print (ResultSet resultSet)  throws SQLException {
         final  ResultSetMetaData  metaData  = resultSet.getMetaData();
         final  int  columnCount  = metaData.getColumnCount();
         while (resultSet.next()) {
             for ( int  i  =  1 ; ; i++) {
                if (i < columnCount) {
                    System.out.print( ", " );
                } else {
                    break ;

search result:

There are two pitfalls encountered during testing. You can avoid them if you experiment by yourself.

  1. CalciteBy default, all table names and class names in your SQL statements will be converted to uppercase, because the name of the default csv (same for other files) file is the table name, unless you customize the rules, so your file name must be written in uppercase.
  2. CalciteThere are some default keywords that cannot be used as table names, otherwise the query will fail. For example, I user.csvcan’t find the ones I set at the beginning. USERINFOI can change them. This is Mysqlsimilar to the built-in keywords. It can also be configured through personalized configuration. Go and change.


  1. First, prepare Calcitethe necessary things: library, table name, field name, field type.

If the data source is used Mysql, we do not need to define these in the JAVA service. They are created directly in the Mysql client. Here, two tables are directly created for testing, just like our csvfiles.

  `NAME` varchar ( 255 ) CHARACTER  SET utf8mb3 COLLATE utf8_general_ci DEFAULT  NULL ,

  `NAME` varchar ( 255 ) CHARACTER  SET utf8mb3 COLLATE utf8_general_ci DEFAULT  NULL ,
  `MONEY` varchar ( 255 ) CHARACTER  SET utf8mb3 COLLATE utf8_general_ci DEFAULT  NULL 
  1. csvIn the above case SchemaFactoryand Schemathese do not need to be created, because Calcitethe Mysql Adapter adapter is provided by default.
  2. In fact, there is no need to do the above two steps. What we really need to do is to tell Calciteyou the JDBC connection information, which is also model.jsondefined in the file.
  "version" :  "1.0" , 
  "defaultSchema" :  "Demo" , 
  "schemas" :  [ 
      "name" :  "Demo" , 
      "type" :  "custom" , 
    // This is calcite's default SchemaFactory, inside The process is the same as what we defined above. Let’s take a brief look at the source code. 
      "factory" :  "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory" , 
      "operand" :  { 
        // I am using mysql8 or above, so pay attention to the package name here 
        "jdbcDriver" :  "com.mysql. cj.jdbc.Driver" , 
        "jdbcUrl" :  "jdbc:mysql://localhost:3306/irving" , 
        "jdbcUser" :  "root" , 
        "jdbcPassword" :  "123456" 
  1. Introduce the Mysql driver package into the project
  <version> 8.0 .30 </version>
  1. Write the test class, which is equivalent to completing all the functions.
public  class  TestMysql {
     public  static  void  main (String[] args)  throws SQLException {
         Connection  connection  =  null ;
         Statement  statement  =  null ;
         try {
             Properties  info  =  new  Properties ();
            info.put( "model" , Sources.of(TestMysql.class.getResource( "/mysqlmodel.json" )).file().getAbsolutePath());
            connection = DriverManager.getConnection( "jdbc:calcite:" , info);
            statement = connection.createStatement();
            statement.executeUpdate( " insert into userinfo1 values ​​('xxx',12) " );
            print(statement.executeQuery( "select * from asset " ));
            print(statement.executeQuery( " select * from userinfo1 " ));
            print(statement.executeQuery( " select age from userinfo1 where name ='aixiaoxian' " ));
            print(statement.executeQuery( " select * from userinfo1 where age >60 " ));
            print(statement.executeQuery( " select * from userinfo1 where name like 'a%' " ));
        } finally {
    private  static  void  print (ResultSet resultSet)  throws SQLException {
         final  ResultSetMetaData  metaData  = resultSet.getMetaData();
         final  int  columnCount  = metaData.getColumnCount();
         while (resultSet.next()) {
             for ( int  i  =  1 ; ; i++) {
                if (i < columnCount) {
                    System.out.print( ", " );
                } else {
                    break ;

search result:

Mysql implementation principle

Above we model.jsonspecified org.apache.calcite.adapter.jdbc.JdbcSchema$Factorythe class in the file, you can look at the code of this class.

This class writes Factoryand Schematogether, which actually means calling the schemafactoryclass createmethod to create a schema, which is the same as the process we customized above.

The JdbcSchemaclass is also Schemaa subclass of , so it will also implement getTablemethods (we also implemented this above. We were getting the table structure and field types and names of the table, and reading the file from the csv file header). JdbcSchemaThe implementation is through the connection The Mysql server queries the metadata information and then encapsulates the information into Calcitethe required object format.

Here we also need to pay attention csvto two points of attention, capitalization and keyword issues.

public  static JdbcSchema create (
      SchemaPlus parentSchema,
      String name,
      Map<String, Object> operand) {
    DataSource dataSource;
    try {
       final  String  dataSourceName  = (String) operand.get( "dataSource" );
       if (dataSourceName != null ) {
        dataSource =
            AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
      } else {
         //Come here, here is the jdbc connection information we specified in model.json, and finally create a datasource 
        final  String  jdbcUrl  = (String) requireNonNull(operand.get( "jdbcUrl" ), " jdbcUrl" );
         final  String  jdbcDriver  = (String) operand.get( "jdbcDriver" );
         final  String  jdbcUser  = (String) operand.get( "jdbcUser" );
         final  String  jdbcPassword  = (String) operand.get( "jdbcPassword" );
        dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
    } catch (Exception e) {
       throw  new  RuntimeException ( "Error while reading dataSource" , e);
    String  jdbcCatalog  = (String) operand.get( "jdbcCatalog" );
     String  jdbcSchema  = (String) operand.get( "jdbcSchema" );
     String  sqlDialectFactory  = (String) operand.get( "sqlDialectFactory" );
    if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
       return JdbcSchema.create(
          parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
    } else {
       SqlDialectFactory  factory  = AvaticaUtils.instantiatePlugin(
          SqlDialectFactory.class, sqlDialectFactory);
      return JdbcSchema.create(
          parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
  @Override  public  @Nullable Table getTable (String name) {
     return getTableMap( false ).get(name);
  private  synchronized ImmutableMap<String, JdbcTable> getTableMap (
       boolean force) {
     if (force || tableMap == null ) {
      tableMap = computeTables();
    return tableMap;
  private ImmutableMap<String, JdbcTable> computeTables () {
     Connection  connection  =  null ;
     ResultSet  resultSet  =  null ;
     try {
      connection = dataSource.getConnection();
      final Pair< @Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);
       final  String  catalog  = catalogSchema.left;
       final  String  schema  = catalogSchema.right;
       final Iterable<MetaImpl.MetaTable> tableDefs;
       Foo  threadMetadata  = THREAD_METADATA.get( );
       if (threadMetadata != null ) {
        tableDefs = threadMetadata.apply(catalog, schema);
      } else {
         final List<MetaImpl.MetaTable> tableDefList = new  ArrayList <>();
         // Get metadata 
        final  DatabaseMetaData  metaData  = connection.getMetaData();
        resultSet = metaData.getTables(catalog, schema, null , null );
         while (resultSet.next()) {
         //Get the library name, display and other information 
          final  String  catalogName  = resultSet.getString( 1 );
           final  String  schemaName  = resultSet. getString( 2 );
           final  String  tableName  = resultSet.getString( 3 );
           final  String  tableTypeName  = resultSet.getString( 4 );
              new  MetaImpl .MetaTable(catalogName, schemaName, tableName,
        tableDefs = tableDefList;
      final ImmutableMap.Builder<String, JdbcTable> builder =
      for (MetaImpl.MetaTable tableDef : tableDefs) {
         final  String  tableTypeName2  = 
            tableDef.tableType == null 
            ? null 
            : tableDef.tableType.toUpperCase(Locale.ROOT).replace( ' ' , '_' );
         final  TableType  tableType  =
            Util.enumVal(TableType.OTHER, tableTypeName2);
        if (tableType == TableType.OTHER && tableTypeName2 != null ) {
          System.out.println( "Unknown table type: " + tableTypeName2);
        //Finally encapsulated into a JdbcTable object 
        final  JdbcTable  table  = 
            new  JdbcTable ( this , tableDef.tableCat, tableDef.tableSchem,
                tableDef.tableName, tableType);
        builder.put(tableDef.tableName, table);
      return builder.build();
    } catch (SQLException e) {
       throw  new  RuntimeException (
           "Exception while reading tables" , e);
    } finally {
      close(connection, null , resultSet);

SQL execution process

OK, basically two simple cases have been demonstrated here. Finally, I will add the entire Calcitearchitecture and the entire SQL execution process.

The entire process is as follows: SQL parsing (Parser) => SQL verification (Validator) => SQL query optimization (optimizer) => SQL generation => SQL execution

SQL Parser

All SQL statements need to be parsed by a SQL parser before execution. The work of the parser is to parse the Token in SQL into an abstract syntax tree. The node of each tree is a SqlNode. This process is actually Sql Text => SqlNode process.

Our previous Demo did not have a custom Parser because Calcite uses its own default Parser (SqlParserImpl).


SqlNodeis the core of the entire analysis. For example, you can find in the picture that for each content such as selectfrom, and wherekeywords are actually one SqlNode.

parserConfigThe main method is to set the parameters of SqlParserFactory. For example, as we mentioned above, the pitfalls of capitalization and capitalization that I encountered during local testing can be set here.

If you call it directly setCaseSensitive=false, the table name and column name in the SQL statement will not be converted to uppercase. The following is the default, and other parameters can be configured as needed.

SQL Validator

The SQL statement first passes through Parser and then through the syntax validator. Note that Parser does not verify the correctness of the syntax.

The real verification is in validator, which will verify whether the queried table name exists, whether the queried field exists, and whether the types match. This process is more complicated, and the default validatoris SqlValidatorImpl.

Query optimization

For example, relational algebra, such as projections and Cartesian products, Calciteprovide many internal optimizers, and you can also implement your own optimizers.


CalciteIt does not include a storage layer, so it provides an adapter mechanism to access external data storage or storage engines.

Finally, advanced

The official website states that in the future , Kafkathe adapter will be supported in the public domain Api, and it will be as convenient to use as the above integration Mysql. However, it is not supported yet. I will provide you with a way to implement it yourself, so that you can directly query Kafka through SQL. Topic data and other information in the.

Here we have internally integrated and implemented KSQLcapabilities, and the query results are OK.

Still like the above steps, we need to prepare the library, table name, field name, field type, and data source (extra places).

  1. Custom Sqlparsing. We didn’t have custom parsing before. We need custom parsing here because I need to dynamically parse sqlthe whereconditions partation.
  • Configuring the parser is the configuration case mentioned in the previous case.
  • Create a parser, using the defaultSqlParseImpl
  • Start parsing and generating . We can do some business-related verification and parameter analysis ASTbased on the generated ones.SqlNode
  1. Adapter gets data source
   public  class  KafkaConsumerAdapter {
        public  static List<KafkaResult> executor (KafkaSqlInfo kafkaSql) {
            Properties  props  =  new  Properties ();
           props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds());
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" );
           KafkaConsumer<String, String> consumer = new  KafkaConsumer <>(props);
           List<TopicPartition> topics = new  ArrayList <>();
            for (Integer partition : kafkaSql.getPartition()) {
                TopicPartition  tp  =  new  TopicPartition (kafkaSql.getTableName(), partition);
           for (TopicPartition tp : topics) {
               Map<TopicPartition, Long> offsets = consumer.endOffsets(Collections.singleton(tp));
               long  position  =  500 ;
                if (offsets.get(tp).longValue() > position) {
                   consumer.seek(tp, offsets.get(tp).longValue() - 500 );
               } else {
                   consumer.seek(tp, 0 );
           List<KafkaResult> results = new  ArrayList <>();
            boolean  flag  =  true ;
            while (flag) {
               ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis( 100 ));
                for (ConsumerRecord<String, String> record : records) {
                    //Convert to the object collection I defined 
                   KafkaResult  result  =  new  KafkaResult ();
               if (!records.isEmpty()) {
                   flag = false ;
           return results;
  1. By executing the query, we can get the effect we want.
   public  class  TestKafka {
        public  static  void  main (String[] args)  throws Exception {
            KafkaService  kafkaService  =  new  KafkaService ();
            //Put the parsed parameters in my own defined kafkaSqlInfo object 
           KafkaSqlInfo  sqlInfo  = kafkaService.parseSql( "select * from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 " );
            //The adapter obtains the data source, mainly polling data from the above sqlInfo object
           List<KafkaResult> results = KafkaConsumerAdapter.executor(sqlInfo);
           //Execute query
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
           sqlInfo = kafkaService.parseSql( "select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like '%account%' limit 1000 " );
           results = KafkaConsumerAdapter.executor(sqlInfo);
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
           sqlInfo = kafkaService.parseSql( "select count(*) AS addad from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 " );
           results = KafkaConsumerAdapter.executor(sqlInfo);
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
       private  static  void  query (String tableName, List<KafkaResult> results,
                                 String sql)  throws Exception {
            //Create model.json, set my SchemaFactory, set the library name 
           String  model  = createTempJson();
            //Set my table structure, table name, table field name and type
           KafkaTableSchema.generateSchema(tableName, results);
           Properties  info  =  new  Properties ();
           info.setProperty( "lex" , Lex.JAVA.toString());
            Connection  connection  = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info);
            Statement  st  = connection.createStatement();
            / /Execute 
           ResultSet  result  = st.executeQuery(sql);
            ResultSetMetaData  rsmd  = result.getMetaData();
           List<Map<String, Object>> ret = new  ArrayList <>();
            while (result.next()) {
               Map<String, Object> map = new  LinkedHashMap <>();
                for ( int  i  =  1 ; i <= rsmd.getColumnCount(); i++) {
                   map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
       private  static  void  print (ResultSet resultSet)  throws SQLException {
            final  ResultSetMetaData  metaData  = resultSet.getMetaData();
            final  int  columnCount  = metaData.getColumnCount();
            while (resultSet.next()) {
                for ( int  i  =  1 ; ; i++) {
                   if (i < columnCount) {
                       System.out.print( ", " );
                   } else {
                       break ;
       private  static String createTempJson ()  throws IOException {
            JSONObject  object  =  new  JSONObject ();
           object.put( "version" , "1.0" );
           object.put( "defaultSchema" , "QAKAFKA" );
            JSONArray  array  =  new  JSONArray ();
            JSONObject  tmp  =  new  JSONObject ();
           tmp.put( "name" , "QAKAFKA" );
           tmp.put( "type" , "custom" );
           tmp.put( "factory" , "kafka.KafkaSchemaFactory" );
           object.put( "schemas" , array);
            return object.toJSONString();
  • Generate temporary model.json, previously based on files, now based on textstrings, mode=inlinepatterns
  • Set my table structure, table name, field name, field type, etc., and place them in memory, and also put the data queried by the adapter into tableit.
  • Get the connection, execute the query, perfect!