Clickstream plays an important role in analyzing customer behavior. It also helps organizations in making future business strategies. So, let’s discuss real-time clickstream analysis using one of the most prominent and iconoclast component of Kafka Ecosystem i.e. KsqlDB. As it not only provides capabilities of processing and analyzing real-time data, but it also provides organizations to connect to the source/sink.
Let’s Get Started
Let’s skip all the theoretical parts and straight come to the hands-on aspects of Clickstream Analysis using KsqlDB.
Before starting with this blog, first of all, go through our blog on Quickstart Guide for Installing Confluent Platform having Apache Kafka using Docker. As this blog is the succeeding part of the previous blog.
Once you are done with setting up the Confluent Platform using Docker. Then proceed further with this blog. Verify that the services are up and running, run the following command:
docker-compose ps
Now, login to Confluent Control Center UI: http://localhost:9021
Note: If you are deploying on the cloud, then do make sure you have provided the necessary firewall rules for Confluent Control Center UI
Click on cluster 1 i.e controlcenter.cluster
The DatagenConnector provides some simple schemas for generating example clickstream, orders, users, and pageviews data.
Generate Example ClickStream Records
The clickstream quickstart option produces records that simulate userid, time, page request, ip, agent, etc fields.
Inside Connect tab select default cluster
And then you need to click on add Connector
Search for datagen connect, then select datagen connector
Create a datagenconnect named as “clickstream_generator”
Datagenconnector would help to create sample clickstream data in the Clickstream topic.
Scroll down to general tab, inside kafka topic create a kafka topic named as “clickstream”
Then we need to add max.interval:100, iteration:500000, and select quickstart: clickstream
Then click on continue
After setup configuration, verify your configuration then click on launch
Datagen successfully generates data inside clickstream topics.
You can verify by using the below command
Open a new terminal and execute a below query
docker exec -it broker bash
kafka-console-consumer --bootstrap-server localhost:9092 --topic clickstream --from-beginning
Now open a new terminal and execute the below command to open KSQL CLI
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Let’s start by checking either clickstream topic exist or not
ksql> show topics;
To check the key for your messages, you can use the PRINT command:
ksql> PRINT 'clickstream' FROM BEGINNING LIMIT 1;
Inspect Topics By Using KsqlDB in Control Center
In the cluster submenu, click ksqlDB and open the ksqlDB Editor on the default application.
Use the ksqlDB Editor page to create streams and tables, run SQL queries, and browse messages.
Let’s create a stream using ksqlDB Editor in the control center UI which would include log data fields such as userid, time, page request, ip, agent, etc.
create stream LOG_DATA_STREAM ( time varchar, ip varchar, request varchar, userid int, agent varchar) with (kafka_topic='clickstream', value_format='Avro');
After creating streams, the resizeable streams pane is populated with the defined streams and tables for the application, beyond the default LOG_DATA_STREAM:
- Streams are indicated with a blue St expandable node:
- Tables are indicated with a brown Tb expandable node:
Use select command to verify the output
Select * from LOG_DATA_STREAM emit changes ;
Session Window
A session window aggregates records into a session, which represents a period of activity separated by a specified gap of inactivity, or “idleness”. Any records with timestamps that occur within the inactivity gap of existing sessions are merged into the existing sessions. If a record’s timestamp occurs outside of the session gap, a new session is created.
Count the number of users per region for session windows with a session inactivity gap of 60 seconds, you might run the following query, which sessionizes the input data and performs the counting/aggregation step per region:
SELECT userid, COUNT(*) AS num_of_user FROM LOG_DATA_STREAM WINDOW SESSION (60 SECONDS) GROUP BY userid EMIT CHANGES;
A series of basic Log data analytics using Ksqldb Cli
Now let’s analyze the Log stream, go back to ksqlcli
Step 1: Create table using CTAS command, with the help of clickstream KStream for getting number of tasks per minute.
ksql> CREATE table tasks_per_min AS SELECT userid, count(*) AS events FROM log_data_stream GROUP BY userid;
select * from tasks_per_min emit changes limit 10;
Step 2: Now create a table that would store data of the number of times different agents used to visit various web pages.
ksql> CREATE TABLE agents_used AS SELECT userid, agent, count(*) FROM log_data_stream GROUP BY userid, agent;
Step 3: Now create a table that would help you to get to know which pages are visited and how many times.
ksql> CREATE TABLE pages_visited AS SELECT request, count(*) FROM log_data_stream GROUP BY request;
Select * from pages_visited emit changes limit 20;
Congrats, you have just performed Clickstream analysis in real-time using KsqlDB on Confluent Platform!!