Building and Deploying a Real-Time Stream Processing ETL Engine with Kafka and ksqlDB

Dec 20, 2020

This article was originally published at Towards Data Science Medium Publication.

Enabling a Powerful Search Capability — From Design Decisions to everything behind the scenes

Search-Feature

At Koverhoop, we are building a couple of large-scale projects in the Insurance domain. For one of our projects, which is a Multi-tenant Group Insurance Brokerage Platform, klient.ca, we were to build a Powerful Search Capability, by being able to denormalize incoming events in real-time. Also, we wanted our search results to appear as we type. Below is what we were able to achieve, and in this post, I’ll be talking about the core infrastructure, how we fully automate its deployment, and how you too can set it up really quick.

Search-Gif

Search in Action

Also, consider this as a two-part series, where I’ll be discussing the following:

Part 1: Understanding the stack used to power this Search Capability and deploying it using Docker and docker-compose. (this post)

Part 2: Making a Scalable Production deployment of these services using Kubernetes. (yet to be published)

Problem Definition and Decisions Taken

To build a search engine that was quick, and real-time we had to make certain design decisions. We use Postgres as our primary database. So we had the following options we could use:

  1. Query the Postgres database directly for each character we type in the search bar. 😐
  2. Use an effective search database like Elasticsearch, and store data in a denormalized fashion. 🤔

Considering, we were already a multi-tenant application, also the entities to be searched for could require processing a lot of joins (if we used Postgres), and the scale we were projecting was quite high, we decided to not use the former option of querying the database directly, for performance implications and latency-related concerns.

So we had to decide on a reliable, and efficient way to move data from Postgres to Elasticsearch in real-time. Again the following decisions were to be made:

  1. Use Logstash to query the Postgres database at regular intervals and send data to Elasticsearch. 😶
  2. Or use the Elasticsearch client in our application and CRUD the data both in Postgres and Elasticsearch together. 🧐
  3. Or use an event-based streaming engine that retrieves events from Postgres’ write-ahead logs, stream them to a stream processing server, enrich the streams, and sink it to Elasticsearch. 🤯

Option 1 was struck out pretty quick as it was not real-time, and even if we query at shorter intervals it would put a significant load on the Postgres server, and also could incur additional query latency and server instability as our data grows. Choosing between the other two options could be a different decision at different companies. We could foresee some issues for our use case if we chose option 2:

  1. Either, we would need to maintain transactions and rollouts between the two kinds of databases: Postgres and Elasticsearch: we would have to rely on two DBs to make persistent updates; what if one of the DB is slow to acknowledge the update, it might slow down our application.
  2. Or, if we just allow asynchronous updates for Elaticsearch, then in case of inconsistency how can we retry inserting an event or group of events?

Search-High-level

Hence, we decided to go with building an event-based queue infrastructure. Also because we have planned some future use cases and services that were appropriate to be based on events, like notification services, data warehousing, microservice architecture, etc. Without further ado, let’s jump directly into the general overview of the solution and services used.

Basic Overview of the Services

For the implementation of an event-based streaming infrastructure, we decided to go with the Confluent Kafka Stack.

Search-Confluent

Image Source: Confluent Inc.

Following are the services we incorporated:

Apache Kafka: Kafka is an open-source-based distributed event streaming platform. It would be the main storage area of our database events (inserts, updates, and deletes).

Kafka Connect: We use Kafka-connect to ingest data into Kafka from the Debezium’s Postgres connector, which fetches the events from Postgres WAL files.
At the sink side, we use ElasticSearch Connector to process and load data into Elasticsearch. Connect can run either as a standalone application, or as a fault-tolerant, and scalable service for a production environment.

ksqlDB: ksqlDB allows to build a stream processing application over data in Kafka. It uses Kafka-streams internally, to transform events as they come. We used it for enriching events of a particular stream with pre-existing events of other tables already persisted in Kafka that might be relevant for search abilities, for example, the tenant_id from the root table.

Search-Merge

With ksqlDB, it is as easy as writing SQL queries to Filter, Aggregate, Join, and Enrich data. For example, let's assume we are receiving a stream of events on two topics with information related to brands and brand_products. Considering this as a multi-tenant data source, we need to enrich brand_products with tenant_id which is currently only associated with brands. Then we can use these enriched records and store them in Elasticsearch in a denormalized form (to make the search work).

We can set up a KStream using a topic:

CREATE STREAM “brands”   
WITH (  
    kafka_topic = ‘store.public.brands’,   
    value_format = ‘avro’  
);

To use only a few columns and partition the stream by id, we can create a new stream called enriched_brands:

CREATE STREAM “enriched_brands”   
WITH (  
    kafka_topic = ‘enriched_brands’  
)   
AS   
    SELECT   
        CAST(brand.id AS VARCHAR) as “id”,   
        brand.tenant_id as “tenant_id”,   
        brand.name as “name”   
    FROM   
        “brands” brand   
    PARTITION BY   
        CAST(brand.id AS VARCHAR)   
    EMIT CHANGES;

The set of events can then be materialized by the latest offset in a KTable. We use this so that we can join the current state of brand events with some other stream.

CREATE TABLE “brands_table”   
AS   
    SELECT   
        id as “id”,   
        latest_by_offset(tenant_id) as “tenant_id”  
    FROM   
        “brands” group by id   
    EMIT CHANGES;

Now we add a new stream called brand_products that has a field brand_id with it, but not tenant_id.

CREATE STREAM “brand_products”   
WITH (  
    kafka_topic = ‘store.public.brand_products’,   
    value_format = ’avro’   
);

We can enrich the brand_products with tenant_id using the following join query:

CREATE STREAM “enriched_brand_products”   
WITH (  
    kafka_topic = ‘enriched_brand_products’   
) AS   
    SELECT   
        “brand”.“id” as”brand_id”,   
        ”brand”.”tenant_id” as ”tenant_id”,   
        CAST(brand_product.id AS VARCHAR) as ”id”,  
        brand_product.name AS ”name”   
    FROM   
        ”brand_products” AS brand_product   
    INNER JOIN ”brands_table” ”brand”   
    ON   
        brand_product.brand_id = ”brand”.”id”   
    PARTITION BY   
        CAST(brand_product.id AS VARCHAR)   
    EMIT CHANGES;

Schema Registry: It’s a layer over Kafka for storing metadata of the events you ingest in Kafka. It is based on AVRO schemas and provides a REST interface for storing and retrieving them. It helps in ensuring some schema compatibility checks and their evolution over time.

Configuring the Stack

We use Docker and docker-compose to configure and deploy our services. Below are ready-to-build services written in a docker-compose file that will run Postgres, Elasticsearch, and Kafka-related services. I’ll also explain each of the services mentioned below.

Postgres and Elasticsearch

postgres:
    build: services/postgres
    container_name: oeso_postgres
    volumes:
      - database:/var/lib/postgresql/data
    env_file:
      - .env
    ports:
      - 5432:5432
    networks:
      - project_network

Docker-compose service for Postgres

 

elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.10.0
    container_name: elasticsearch
    volumes:
      - ./services/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml:ro
      - elasticsearch-database:/usr/share/elasticsearch/data
    env_file:
      - .env
    ports:
      - "9200:9200"
      - "9300:9300"
    networks:
      - project_network

Docker-compose service for Elasticsearch

 

For streaming out the events from the source database, we need to enable logical decoding to allow replication from its logs. As in the case of Postgres, these logs are called Write-Ahead Logs (WAL), and they get written to a file. We need a logical decoding plugin, in our case wal2json to extract easy-to-read information about persistent database changes so that it can be emitted out as events to Kafka.

For setting up the required extensions you may refer to this Postgres Dockerfile.

For both, Elasticsearch and Postgres, we specify some necessary variables in the environment file, to set them up with a username, password, etc.

Zookeeper

zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - project_network

Zookeeper, in general, acts as a centralized service to distributed platforms like Kafka, which stores all the metadata like the status of Kafka nodes and keeps track of topics or partitions.

There are plans to run Kafka without a zookeeper, but for now, it is a necessary requirement for managing the cluster.

Kafka Broker

broker:
    image: confluentinc/cp-enterprise-kafka:6.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    networks:
      - project_network

For the sake of simplicity, we’d be setting up a single-node Kafka cluster. I would be discussing more on multiple brokers cluster in Part 2 of this series.

It is particularly important to understand some configurations we made here for the Kafka broker.

Listeners

As Kafka is designed to be a distributed platform, we need to provide certain ways of allowing Kafka brokers to communicate with each other internally, and with other clients externally based on your network structure. So we do that using listeners, and a listener is a combination of Host, Port, and Protocol.

KAFKA_LISTENERS This is a list of host, port, and protocol combination interfaces, the Kafka binds to. By default, it is set to 0.0.0.0; that is listening on all interfaces.

KAFKA_ADVERTISED_LISTENERS The values for this is again a combination of host and port which the client will use to connect to a Kafka broker. So if the client is within docker, it can connect to the broker using broker:9092 or if there’s a client external to docker, it is returned localhost:9092 to make the connection. We also need to mention the listener name which is then mapped to an appropriate protocol, to make the connection.

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP Here we map a user-defined listener name to the protocol we want to use for communication; It can be either PLAINTEXT (unencrypted) or SSL (encrypted). These names are further used in KAFKA_LISTENERS and KAFKA_ADVERTISED_LISTENERS to use appropriate protocols with host/IP.

As we are configuring only a single-node Kafka cluster, the returned, or let’s say the advertised address to any client is of the same broker itself.

Schema-Registry

schema-registry:
    image: confluentinc/cp-schema-registry:6.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
    networks:
      - project_network

For a single-node schema-registry, we specify the connection string for the zookeeper, used by the Kafka cluster; to store schema-related data.

Kafka-Connect

connect:
    image: confluentinc/cp-kafka-connect:6.0.0
    hostname: connect
    container_name: connect
    volumes:
      - "./producers/debezium-debezium-connector-postgresql/:/usr/share/confluent-hub-components/debezium-debezium-connector-postgresql/"
      - "./consumers/confluentinc-kafka-connect-elasticsearch/:/usr/share/confluent-hub-components/confluentinc-kafka-connect-elasticsearch/"
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      KAFKA_HEAP_OPTS: "-Xms256M -Xmx512M"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181"
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    networks:
      - project_network

We see some new parameters like:

→ CONNECT_BOOTSTRAP_SERVERS: A set of host and port combinations that are used for establishing the initial connection to the Kafka cluster.

→ CONNECT_KEY_CONVERTER: Used for serializing the key from connect format to a format that is compatible with Kafka. Similarly, for CONNECT_VALUE_CONVERTER we use the AvroConverter for serialization.

It’s pretty important to map volumes for our source and sink connector plugins and specify them in the CONNECT_PLUGIN_PATH

ksqlDB

ksqldb-server:
    image: confluentinc/cp-ksqldb-server:6.0.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8088:8088"
    volumes:
      - "./producers/debezium-debezium-connector-postgresql/:/usr/share/kafka/plugins/debezium-debezium-connector-postgresql/"
      - "./consumers/confluentinc-kafka-connect-elasticsearch/:/usr/share/kafka/plugins/confluentinc-kafka-connect-elasticsearch/"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_STREAMS_MAX_TASK_IDLE_MS: 2000
      KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "ksql-connect-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "ksql-connect-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "ksql-connect-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
    networks:
      - project_network

If you don’t intend to use Kafka-Connect and when there is no need to scale Kafka-Connect independent of ksql, you can set the embedded-connect configurations for ksql; which also exposes connect endpoints from the ksqldb-server

Other than that, there’s an environment variable that needs some consideration:

→ KSQL_KSQL_STREAMS_MAX_TASK_IDLE_MS: For current versions of ksqlDB, for stream-table joins, the joining of results can become non-deterministic i.e you may not get a successful join if in real-time the event in the table to be joined isn’t created/updated before the stream event. Configuring this environment variable helps in somewhat waiting for the event to load in the table when an event in the stream arrives for that particular timestamp. This improves join predictability but may cause some performance degradation. Efforts are being put to improve this here.

Actually, if you don’t understand the above clearly, I would suggest you use this config for now, as it just works; it actually needs another post to discuss time synchronization in detail or if you’re still curious, you can watch this video by Matthias J. Sax, from Confluent.

ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:6.0.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
    networks:
      - project_network

It is pretty handy to use a ksqldb-cli service to try and test streams when you’re in testing or development environments. Even in production environments if you want to explore event-streams or Ktables; or manually create or filter streams. Though it is suggested that you automate stream, table, or topic creation either using ksql or kafka clients or their REST endpoints, which we will discuss below.

Search-Detailed

A more detailed look at our architecture, so far.

Initializing data streams

streams-init:
    build: jobs/streams-init
    container_name: streams-init
    depends_on:
      - zookeeper
      - broker
      - schema-registry
      - ksqldb-server
      - ksqldb-cli
      - postgres
      - elasticsearch
      - connect
    env_file:
      - .env
    environment:
      ZOOKEEPER_HOSTS: "zookeeper:2181"
      KAFKA_TOPICS: "brands, brand_products"
    networks:
      - project_network

The purpose of this service is to initialize the streams and configure things inside Kafka and other services we are using. At the time of deployment, we would not want to manually create topics, streams, connections, etc. on the server. So we utilize the REST services offered for each of the services and write a shell script to automate this process.

Our setup script looks something like the below:

#!/bin/bash

# Setup ENV variables in connectors json files
sed -i "s/POSTGRES_USER/${POSTGRES_USER}/g" connectors/postgres.json
sed -i "s/POSTGRES_PASSWORD/${POSTGRES_PASSWORD}/g" connectors/postgres.json
sed -i "s/POSTGRES_DB/${POSTGRES_DB}/g" connectors/postgres.json
sed -i "s/ELASTIC_PASSWORD/${ELASTIC_PASSWORD}/g" connectors/elasticsearch.json

# Simply wait until original kafka container and zookeeper are started.
export WAIT_HOSTS=zookeeper:2181,broker:9092,schema-registry:8081,ksqldb-server:8088,elasticsearch:9200,connect:8083
export WAIT_HOSTS_TIMEOUT=300
/wait

# Parse string of kafka topics into an array
# https://stackoverflow.com/a/10586169/4587961
kafkatopicsArrayString="$KAFKA_TOPICS"
IFS=', ' read -r -a kafkaTopicsArray <<< "$kafkatopicsArrayString"

# A separate variable for zookeeper hosts.
zookeeperHostsValue=$ZOOKEEPER_HOSTS

# Terminate all queries
curl -s -X "POST" "http://ksqldb-server:8088/ksql" \
         -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
         -d '{"ksql": "SHOW QUERIES;"}' | \
  jq '.[].queries[].id' | \
  xargs -Ifoo curl -X "POST" "http://ksqldb-server:8088/ksql" \
           -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
           -d '{"ksql": "TERMINATE 'foo';"}'
           

# Drop All Tables
curl -s -X "POST" "http://ksqldb-server:8088/ksql" \
             -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
             -d '{"ksql": "SHOW TABLES;"}' | \
      jq '.[].tables[].name' | \
      xargs -Ifoo curl -X "POST" "http://ksqldb-server:8088/ksql" \
               -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
               -d '{"ksql": "DROP TABLE \"foo\";"}'


# Drop All Streams
curl -s -X "POST" "http://ksqldb-server:8088/ksql" \
           -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
           -d '{"ksql": "SHOW STREAMS;"}' | \
    jq '.[].streams[].name' | \
    xargs -Ifoo curl -X "POST" "http://ksqldb-server:8088/ksql" \
             -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
             -d '{"ksql": "DROP STREAM \"foo\";"}'
             

# Create kafka topic for each topic item from split array of topics.
for newTopic in "${kafkaTopicsArray[@]}"; do
    # https://kafka.apache.org/quickstart
    curl -X DELETE http://elasticsearch:9200/enriched_$newTopic --user elastic:${ELASTIC_PASSWORD}
    curl -X DELETE http://schema-registry:8081/subjects/store.public.$newTopic-value
    kafka-topics --create --topic "store.public.$newTopic" --partitions 1 --replication-factor 1 --if-not-exists --zookeeper "$zookeeperHostsValue"
    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data @schemas/$newTopic.json http://schema-registry:8081/subjects/store.public.$newTopic-value/versions

done

curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d $'{ "ksql": "CREATE STREAM \\"brands\\" WITH (kafka_topic = \'store.public.brands\', value_format = \'avro\');", "streamsProperties": {} }'
curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d $'{ "ksql": "CREATE STREAM \\"enriched_brands\\" WITH ( kafka_topic = \'enriched_brands\' ) AS SELECT CAST(brand.id AS VARCHAR) as \\"id\\", brand.tenant_id as \\"tenant_id\\", brand.name as \\"name\\" from \\"brands\\" brand partition by CAST(brand.id AS VARCHAR) EMIT CHANGES;", "streamsProperties": {} }'

curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d $'{ "ksql": "CREATE STREAM \\"brand_products\\" WITH ( kafka_topic = \'store.public.brand_products\', value_format = \'avro\' );", "streamsProperties": {} }'
curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d $'{ "ksql": "CREATE TABLE \\"brands_table\\" AS SELECT id as \\"id\\", latest_by_offset(tenant_id) as \\"tenant_id\\" FROM \\"brands\\" group by id EMIT CHANGES;", "streamsProperties": {} }'
curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d $'{ "ksql": "CREATE STREAM \\"enriched_brand_products\\" WITH ( kafka_topic = \'enriched_brand_products\' ) AS SELECT \\"brand\\".\\"id\\" as \\"brand_id\\", \\"brand\\".\\"tenant_id\\" as \\"tenant_id\\", CAST(brand_product.id AS VARCHAR) as \\"id\\", brand_product.name AS \\"name\\" FROM \\"brand_products\\" AS brand_product INNER JOIN \\"brands_table\\" \\"brand\\" ON brand_product.brand_id = \\"brand\\".\\"id\\" partition by CAST(brand_product.id AS VARCHAR) EMIT CHANGES;", "streamsProperties": {} }'

curl -X DELETE http://connect:8083/connectors/enriched_writer
curl -X "POST" -H "Content-Type: application/json" --data @connectors/elasticsearch.json http://connect:8083/connectors

curl -X DELETE http://connect:8083/connectors/event_reader
curl -X "POST" -H "Content-Type: application/json" --data @connectors/postgres.json http://connect:8083/connectors

This is what currently works for us:

→ We make sure all the services are ready before we run any jobs over them;
→ We need to be sure the topics exist on Kafka, or we create new ones;
→ Our streams should be functional even if there’s any schema update;
→ Connections are made again, to account for the password or version changes of the underlying data sources or sinks.

The purpose of sharing this setup script is only to demonstrate a way to automate these pipelines. The exact same setup might not work for you, but the idea remains the same for automating your workflows and avoiding any manual work on every deployment across any environment.

To have this Data Infrastructure up and running really quick for you, refer to this Github Repository

So, clone the repository and do a:

cp default.env .env

docker-compose up -d

..on your terminal.

Create brands and brand_products tables inside the store Postgres database:

CREATE TABLE brands (  
    id serial PRIMARY KEY,  
    name VARCHAR (50),  
    tenant_id INTEGER  
);CREATE TABLE brand_products (  
    id serial PRIMARY KEY,  
    brand_id INTEGER,  
    name VARCHAR(50)  
);

Insert some records in the brands table:

INSERT INTO brands VALUES(1, 'Brand Name 1', 1);  
INSERT INTO brands VALUES(2, 'Brand Name 2', 1);  
INSERT INTO brands VALUES(3, 'Brand Name 3', 2);  
INSERT INTO brands VALUES(4, 'Brand Name 4', 2);

And some records in brand_products table:

INSERT INTO brand_products VALUES(1, 1, 'Product Name 1');  
INSERT INTO brand_products VALUES(2, 2, 'Product Name 2');  
INSERT INTO brand_products VALUES(3, 3, 'Product Name 3');  
INSERT INTO brand_products VALUES(4, 4, 'Product Name 4');  
INSERT INTO brand_products VALUES(5, 1, 'Product Name 5');

See the brand_products getting enriched with tenant_id in Elasticsearch:

curl localhost:9200/enriched_brand_products/_search --user elastic:your_password

I’ll be continuously contributing to the above repository; adding deployment configurations for a multi-node Kafka infrastructure with Kubernetes; writing more connectors; implementing a framework for a plug-and-play architecture, using only the desired services. Feel free to contribute to it or let me know of any Data Engineering problems that you face in your current setup, here.

Next Steps

I hope this post gives you a fair idea on deploying and running a complete Kafka stack, for a basic yet effective use case of building Real-time Stream Processing Applications.

Based on the nature of your product or company, the deployment process may vary to suit your requirements. I do have plans to address the scalability aspect of such a system in the next part of this series which would be about deploying such an infrastructure on Kubernetes, for exactly the same use case.