Skip to main content
Skip table of contents

ksqlDB Connector for Kafka (Preview)

ksqlDB is a database purpose-built to help developers create stream processing applications on top of Apache Kafka. It offers real-time data processing and analytics capabilities with an SQL-like language. ksqlDB enables users to work with data as either streams, tables, or materialized views and is directly integrated with Kafka topics. This connector utilizes ksqlDB's capabilities to enable easy access to Apache Kafka streams using SQL. You will be able to run both pull and push SELECT queries along with some other statements, such as CREATE or DROP.

Type Name

ksqldb

Connection Properties

Template name: ksqldb

Appropriate translator name: ksqldb

Properties:

  • host (default: localhost)
  • port (default: )
  • min-pool-size (default: 2)
  • max-pool-size (default: 70)
  • cloudAgent (default: FALSE)

Data Source Creation

Data source can be created using the following SQL queries:

SQL
CALL SYSADMIN.createConnection('ksqldb_source','ksqldb','host=localhost,port=8088') ;;
CALL SYSADMIN.createDatasource('ksqldb_source','ksqldb','','supportsNativeQueries=true') ;;

Translator Properties

NameDescriptionDefault value
streamTimeout

This property specifies the duration for the native() procedure calls in ksqlDB push queries, defining how long the procedure fetches data. Set values using the ISO-8601 period format: 'PT5S' for 5 seconds, 'PT8M' for 8 minutes, etc.

PT5S (5 seconds)

supportsNativeQueries

Makes available the native() system procedure that can be used to pass native queries directly to an underlying DBMS

FALSE

Usage

The connector supports two distinct query types: regular SQL and native queries. Regular SQL queries are limited to SELECT and INSERT statements. SELECT statements are internally translated into ksqlDB pull queries. For executing ksqlDB push queries, as well as CREATE and DROP statements or any statement incorporating ksqlDB native functions, native queries should be used.

SELECT Queries

These SELECT queries in CData Virtuality function like regular SELECT queries but are specifically designed to retrieve the entire content from a ksqlDB materialized view or stream via a pull query. Here is an example:

SQL
SELECT "LOGGER", "LEVEL", "TIME", "MESSAGE" FROM "ksqldb.KSQL_PROCESSING_LOG" ;;

Native Queries

The native() procedure facilitates the execution of queries using the native ksqlDB query syntax. Currently, it supports three types of native queries:

  • STMT: designed to create and manage new streams, tables, and persistent queries. It can be employed for various operations such as CREATE, DROP, or TERMINATE queries;
  • STREAM: this type is used for executing ksqlDB push queries. It initiates a query that continually provides a stream of updates for a duration set by the streamTimeout translator property described above;
  • QUERY: this method is utilized for running pull queries. It allows you to obtain query results as a single batch, which is delivered upon the completion of the query.

If the type parameter is omitted, the default is QUERY.

Examples

1. STMT: CREATE STREAM query:

SQL
CALL ksqldb_source.native('CREATE STREAM TEST_STREAM (INT_COLUMN INTEGER, STR_COLUMN STRING) WITH (KAFKA_TOPIC=''test_stream_topic'', KEY_FORMAT=''KAFKA'', PARTITIONS=2, VALUE_FORMAT=''JSON'');','STMT') ;;

2. STMT: DROP STREAM query:

SQL
CALL ksqldb_source.native('DROP STREAM TEST_STREAM;','STMT') ;;

3. STREAM: different ways of getting data for a PUSH query:

SQL
SELECT cast(tuple AS string) FROM ksqldb_source.native('SELECT * FROM TEST_STREAM EMIT CHANGES;','STREAM') ;;

SELECT cast(tuple[1] as integer) as int_column FROM ksqldb_source.native('SELECT INT_COLUMN FROM TEST_STREAM EMIT CHANGES;','STREAM') ;;

SELECT x.* 
	FROM ksqldb_source.native('SELECT INT_COLUMN, STR_COLUMN FROM TEST_STREAM EMIT CHANGES;','STREAM') r, 
	ARRAYTABLE(r.tuple COLUMNS "int" integer, "str" string) x ;;

4. QUERY: different ways of getting data for a PULL query:

SQL
SELECT cast(tuple AS string) FROM ksqldb_source.native('SELECT AS_VALUE(INT_COLUMN), MASK(STR_COLUMN) FROM TEST_STREAM;','QUERY') ;;

SELECT cast(tuple[1] as string) as str_column FROM ksqldb_source.native('SELECT MASK(STR_COLUMN) FROM TEST_STREAM;','QUERY') ;;

SELECT x.* 
	FROM ksqldb_source.native('SELECT AS_VALUE(INT_COLUMN), MASK(STR_COLUMN) FROM TEST_STREAM;','QUERY') r, 
	ARRAYTABLE(r.tuple COLUMNS "int" integer, "str" string) x ;;

Limitations

  • Pushdown capabilities are not available except for the LIMIT clause;
  • Authentication is not supported;
  • Compound and binary data types are not fully supported;
  • Push queries are executable solely through the native() procedure;
  • Query timeout settings are applicable per data source, not on a per-query basis.


ksqlDB connector available since v4.2

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.