Using Kafka Connect JDBC Source: a PostgreSQL example

Creating the PostgreSQL Source system

avn service create pg-football \ -t pg \ --cloud google-europe-west3 \ -p business-4
avn service wait pg-football

Time to Scout Football Players

avn service cli pg-football
CREATE TABLE football_players (
name VARCHAR ( 50 ) PRIMARY KEY,
nationality VARCHAR ( 255 ) NOT NULL,
is_retired BOOLEAN DEFAULT false,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
modified_at TIMESTAMP
)
;
CREATE OR REPLACE FUNCTION change_modified_at()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS
$$
BEGIN
NEW.modified_at := NOW();
RETURN NEW;
END;
$$
;
CREATE TRIGGER modified_at_updates
BEFORE UPDATE
ON football_players
FOR EACH ROW
EXECUTE PROCEDURE change_modified_at();

Football Scouting App at Work

insert into football_players (name, nationality, is_retired) values ('Andrea Pirlo','Italian', true);
insert into football_players (name, nationality, is_retired) values ('Cristiano Ronaldo','Portuguese', false);
insert into football_players (name, nationality, is_retired) values ('Megan Rapinoe','American', true);
select * from football_players;
name            | nationality   | is_retired | created_at | modified_at
-------------------+-------------+------------+----------------------------+-------------
Andrea Pirlo | Italian | t | 2021-03-11 10:35:52.04076 |
Cristiano Ronaldo | Portuguese| f | 2021-03-11 10:35:52.060104 |
Megan Rapinoe | American | t | 2021-03-11 10:35:52.673554 |
(3 rows)
update football_players set is_retired=false where name='Andrea Pirlo';
name        | nationality | is_retired |         created_at         |        modified_at         
------------------+-------------+------------+----------------------------+----------------------------
Cristiano Ronaldo | Portuguese | f | 2021-03-11 10:35:52.060104 |
Megan Rapinoe | American | t | 2021-03-11 10:35:52.673554 |
Andrea Pirlo | Italian | f | 2021-03-11 10:35:52.04076 | 2021-03-11 10:39:49.198286
(3 rows)

Creating a Kafka environment

avn service create kafka-football         \
-t kafka \
--cloud google-europe-west3 \
-p business-4 \
-c kafka.auto_create_topics_enable=true \
-c kafka_connect=true

Connecting the dots

Create a JSON configuration file

{
"name": "pg-timestamp-source",
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/<DATABASE>?sslmode=require",
"connection.user": "<PG_USER>",
"connection.password": "<PG_PASSWORD>",
"table.whitelist": "football_players",
"mode": "timestamp",
"timestamp.column.name":"modified_at,created_at",
"poll.interval.ms": "2000",
"topic.prefix": "pg_source_"
}
avn service get pg-football --format '{service_uri_params}'

Start the JDBC connector

avn service connector create kafka-football @kafka_jdbc_config.json
avn service connector status kafka-football pg-timestamp-source
{
"status": {
"state": "RUNNING",
"tasks": [
{
"id": 0,
"state": "RUNNING",
"trace": ""
}
]
}
}

Check the data in Kafka with Kafkacat

mkdir -p kafkacerts
avn service user-creds-download kafka-football \
-d kafkacerts \
--username avnadmin
avn service get kafka-football --format '{service_uri}'
bootstrap.servers=<KAFKA_SERVICE_URI>
security.protocol=ssl
ssl.key.location=kafkacerts/service.key
ssl.certificate.location=kafkacerts/service.cert
ssl.ca.location=kafkacerts/ca.pem
kafkacat -F kafkacat.config -C -t pg_source_football_players
{"name":"Cristiano Ronaldo","nationality":"Portuguese","is_retired":false,"created_at":1615458952060,"modified_at":null} {"name":"Megan Rapinoe","nationality":"American","is_retired":true,"created_at":1615458952673,"modified_at":null} {"name":"Andrea Pirlo","nationality":"Italian","is_retired":false,"created_at":1615458952040,"modified_at":1615459189198} % Reached end of topic pg_source_football_players [0] at offset 3

Updating the listings

insert into football_players (name, nationality, is_retired) values ('Enzo Gorlami','Italian', false);
update football_players set is_retired=true where name='Cristiano Ronaldo';
sql
defaultdb=> select * from football_players;
name | nationality | is_retired | created_at | modified_at
-------------------+-------------+------------+----------------------------+----------------------------
Megan Rapinoe | American | t | 2021-03-11 10:35:52.673554 |
Andrea Pirlo | Italian | f | 2021-03-11 10:35:52.04076 | 2021-03-11 10:39:49.198286
Enzo Gorlami | Italian | f | 2021-03-11 11:09:49.411885 |
Cristiano Ronaldo | Portuguese | t | 2021-03-11 10:35:52.060104 | 2021-03-11 11:11:36.790781
(4 rows)
{"name":"Enzo Gorlami","nationality":"Italian","is_retired":false,"created_at":1615460989411,"modified_at":null}
% Reached end of topic pg_source_football_players [0] at offset 4
{"name":"Cristiano Ronaldo","nationality":"Portuguese","is_retired":true,"created_at":1615458952060,"modified_at":1615461096790}
% Reached end of topic pg_source_football_players [0] at offset 5

Further reading

Wrapping up

Your database in the cloud, www.aiven.io

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store