Apache Flink is an open source framework for data processing in both stream and batch mode. It supports a variety of different data platforms, including Apache Kafka and any JDBC database. Flink’s superpowers come in a variety of languages: from the more traditional Java and Scala, all the way to Python. And for Python there is pyFlink that you can use with the best Machine Learning libraries available.
However, in the data world, one of the most commonly-known languages is SQL. From data analysts, to scientists and engineers, SQL is usually part of the toolkit of data practitioners. SQL provides an abstraction to data access and manipulation that goes beyond technologies and trends. You can use it to query your data no matter what the underlying tech is (with minor dialect differences).
With Apache Flink, you can define whole data pipelines in pure SQL using its SQL Client. This blog post will get you set up with a local Docker-based platform for Apache Flink, including an SQL client.
Set up Apache Flink locally with Docker Compose
As mentioned above, Apache Flink is a very interesting technology and worth trying out. When evaluating a new tool, running it locally is a two-edged sword. On the one hand, you get the benefit of understanding its inner mechanism better. On the other, you also get the pain of the setup phase.
To skip the setup pain, try Docker. It offers tools as pre-packaged solutions portable to any guest OS with minimal effort.
When looking around for content around Flink’s SQL client I found a demo on Apache Flink’s website, which is quite good but not for casual experimentation. That’s why I’ve created a lighter tutorial. This article, with its related Github repository, takes the minimal building blocks and provides a basic Flink functionality, expecting any data pipeline source or target to be available outside the containers. Before starting, make sure that both Docker and Docker Compose are installed.
The whole code is contained in the
aiven/flink-sql-cli-docker repository, which we can clone with the following call in our terminal:
Now let’s open the
flink-sql-cli-docker folder and start the docker compose:
docker-compose up -d
This will start 3 Apache Flink nodes in the background: a
taskmanager and the
sql-client. We can review the details of the cluster like this:
This should show the three containers being in
Name Command State Ports
flink-sql-cli-docker_jobmanager_1 /docker-entrypoint.sh jobm ... Up 6123/tcp, 0.0.0.0:8081->8081/tcp
flink-sql-cli-docker_sql-client_1 /docker-entrypoint.sh Up 6123/tcp, 8081/tcp
flink-sql-cli-docker_taskmanager_1 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp
Flink’s web UI is now available at
localhost:8081. This is a useful tool for browsing information about Flink's status and the data pipelines we're going to create.
Notes about setting up Docker Compose
docker-compose.yml we map
settings subfolder to the
jobmanager Docker container
/settings folder. This way, settings files can be passed between host and guest, which is helpful if specific host generated files (e.g. Keystores) are needed for authentication.
data subfolder is also mapped to the
jobmanager containers. This is purely because I needed to provide the SQL example below, but could be useful in cases where we want to test Apache Flink behaviour against files in a local file system. The
data subfolder contains a
test.csv file with made-up data that we'll use to test Apache Flink.
Unleashing the power of SQL
If we want to play with Flink’s SQL, we need to enter the
sql-client container. We can do that by executing the following command in the terminal:
docker exec -it flink-sql-cli-docker_sql-client_1 /bin/bash
Now we’re in, we can start the Flink’s SQL client with
There we are! We have a fully functional SQL client that we can use to create data pipelines attaching to a variety of data sources and targets. As a little demo example we can query the
test.csv file within the
flink-sql-cli-docker/data folder by defining the associated Flink table within the SQL Client:
'connector' = 'filesystem',
'path' = 'file:///data/test.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
And we can query the table:
select * from people_job;
With the following results:
+/- id name job salary
+ 1 Ugo Football Player 200000
+ 2 Carlo Crocodile domesticator 30000
+ 3 Maria Software Engineer 210000
+ 4 Sandro UX Designer 70000
+ 5 Melissa Software Engineer 95000
Exit Flink’s table view by pressing
Create a Data Target: PostgreSQL
Let’s assume that we want to push the aggregated average salary and people count by job to a PostgreSQL table. If you don’t have a PostgreSQL instance, you can quickly create it with the following Aiven’s CLI command in a new terminal window:
avn service create pg-flink \
-t pg \
--cloud google-europe-west3 \
This creates a PostgreSQL instance (
-t pg), in
google-europe-west3 with a
startup-4 plan. Let's wait for the service to be ready:
avn service wait pg-flink
Let’s create a target table
job_details that we'll use to push data from Flink. From the same terminal window we can execute the following:
avn service cli pg-flink
create table job_summary (
job VARCHAR PRIMARY KEY,
Create the SQL pipeline
Now it’s time to retrieve the connection parameters to PostgreSQL with the following command in a new terminal window:
avn service get pg-flink \
The output should be similar to the following:
Take note of the
password details above and use them to create a Flink table pointing to PostgreSQL. Paste the following SQL into Flink's SQL Cli.
PRIMARY KEY (job) NOT ENFORCED
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://<host>:<port>/<dbname>?sslmode=require',
'table-name' = 'job_summary',
'username' = '<username>',
'password' = '<password>'
Now let’s create the SQL pipeline with the following command in Flink SQL Client:
insert into job_summary_flink
group by job;
We should see an output from the SQL Client like this:
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: b2d8b019c6c6e3dc5fe63902a14c13a9
And we can now check that Flink’s table has been correctly populated with the following SQL from Flink’s CLI:
select * from job_summary_flink;
which results in
+/- job avg_salary nr_people
+ UX Designer 70000 1
+ Crocodile domesticator 30000 1
+ Football Player 200000 1
+ Software Engineer 152500 2
And also checking we can obtain the same result from PostgreSQL with the following command in the PostgreSQL client terminal window:
select * from job_summary;
job | avg_salary | nr_people
UX Designer | 70000 | 1
Crocodile domesticator | 30000 | 1
Football Player | 200000 | 1
Software Engineer | 152500 | 2
This blog post provides a way to get Apache Flink’s SQL Client as a set of Docker containers. It expects additional data sources or targets to be available outside the container and can be used to start the learning journey into Flink. The sql pipeline example shows an integration between a local csv file and PostgreSQL.
Watch this space-we’ll publish more examples of streaming pipelines in the future using Apache Flink in conjunction with other Aiven managed data structures!
Some additional resources:
Stay tuned for more content about Apache Flink in conjunction with Aiven’s other data services.
Originally published at https://aiven.io.