ksqlDB Connector for Kafka (Preview)
ksqlDB is a database purpose-built to help developers create stream processing applications on top of Apache Kafka. It offers real-time data processing and analytics capabilities with an SQL-like language. ksqlDB enables users to work with data as either streams, tables, or materialized views and is directly integrated with Kafka topics. This connector utilizes ksqlDB's capabilities to enable easy access to Apache Kafka streams using SQL. You will be able to run both pull and push SELECT
queries along with some other statements, such as CREATE
or DROP
.
Type Name
ksqldb
Connection Properties
Template name: ksqldb
Appropriate translator name: ksqldb
Properties:
host
(default:localhost
)port
(default: )min-pool-size
(default:2
)max-pool-size
(default:70
)cloudAgent
(default:FALSE
)
Data Source Creation
Data source can be created using the following SQL queries:
CALL SYSADMIN.createConnection('ksqldb_source','ksqldb','host=localhost,port=8088') ;;
CALL SYSADMIN.createDatasource('ksqldb_source','ksqldb','','supportsNativeQueries=true') ;;
Translator Properties
Name | Description | Default value |
---|---|---|
streamTimeout | This property specifies the duration for the |
|
supportsNativeQueries | Makes available the native() system procedure that can be used to pass native queries directly to an underlying DBMS | FALSE |
Usage
The connector supports two distinct query types: regular SQL and native queries. Regular SQL queries are limited to SELECT
and INSERT
statements. SELECT
statements are internally translated into ksqlDB pull queries. For executing ksqlDB push queries, as well as CREATE
and DROP
statements or any statement incorporating ksqlDB native functions, native queries should be used.
SELECT Queries
These SELECT
queries in CData Virtuality function like regular SELECT
queries but are specifically designed to retrieve the entire content from a ksqlDB materialized view or stream via a pull query. Here is an example:
SELECT "LOGGER", "LEVEL", "TIME", "MESSAGE" FROM "ksqldb.KSQL_PROCESSING_LOG" ;;
Native Queries
The native() procedure facilitates the execution of queries using the native ksqlDB query syntax. Currently, it supports three types of native queries:
- STMT: designed to create and manage new streams, tables, and persistent queries. It can be employed for various operations such as
CREATE
,DROP
, orTERMINATE
queries; - STREAM: this type is used for executing ksqlDB push queries. It initiates a query that continually provides a stream of updates for a duration set by the
streamTimeout
translator property described above; - QUERY: this method is utilized for running pull queries. It allows you to obtain query results as a single batch, which is delivered upon the completion of the query.
If the type parameter is omitted, the default is QUERY
.
Examples
1. STMT
: CREATE STREAM
query:
CALL ksqldb_source.native('CREATE STREAM TEST_STREAM (INT_COLUMN INTEGER, STR_COLUMN STRING) WITH (KAFKA_TOPIC=''test_stream_topic'', KEY_FORMAT=''KAFKA'', PARTITIONS=2, VALUE_FORMAT=''JSON'');','STMT') ;;
2. STMT
: DROP STREAM
query:
CALL ksqldb_source.native('DROP STREAM TEST_STREAM;','STMT') ;;
3. STREAM
: different ways of getting data for a PUSH query:
SELECT cast(tuple AS string) FROM ksqldb_source.native('SELECT * FROM TEST_STREAM EMIT CHANGES;','STREAM') ;;
SELECT cast(tuple[1] as integer) as int_column FROM ksqldb_source.native('SELECT INT_COLUMN FROM TEST_STREAM EMIT CHANGES;','STREAM') ;;
SELECT x.*
FROM ksqldb_source.native('SELECT INT_COLUMN, STR_COLUMN FROM TEST_STREAM EMIT CHANGES;','STREAM') r,
ARRAYTABLE(r.tuple COLUMNS "int" integer, "str" string) x ;;
4. QUERY
: different ways of getting data for a PULL
query:
SELECT cast(tuple AS string) FROM ksqldb_source.native('SELECT AS_VALUE(INT_COLUMN), MASK(STR_COLUMN) FROM TEST_STREAM;','QUERY') ;;
SELECT cast(tuple[1] as string) as str_column FROM ksqldb_source.native('SELECT MASK(STR_COLUMN) FROM TEST_STREAM;','QUERY') ;;
SELECT x.*
FROM ksqldb_source.native('SELECT AS_VALUE(INT_COLUMN), MASK(STR_COLUMN) FROM TEST_STREAM;','QUERY') r,
ARRAYTABLE(r.tuple COLUMNS "int" integer, "str" string) x ;;
Limitations
- Pushdown capabilities are not available except for the
LIMIT
clause; - Authentication is not supported;
- Compound and binary data types are not fully supported;
- Push queries are executable solely through the
native()
procedure; - Query timeout settings are applicable per data source, not on a per-query basis.
ksqlDB connector available since v4.2