Long gone are the days where developers would write code to process data only in batches. In the modern landscape of applications, the use of streaming data to capture and process streams of events as they happen is increasingly common, and fully managed cloud services like Amazon MSK surely provide an excellent solution for this. Amazon MSK offers Apache Kafka as a service, removing the operational complexities from developers, enabling them to focus on solving problems instead of infrastructure plumbing. Though you can use the AWS Console to get your Kafka clusters up and running in minutes, it is often a better idea to automate the lifecycle of your clusters using infrastructure-as-code. In this article, I will explain how to get Kafka clusters up and running with Amazon MSK using Terraform, so you can use the approach shown here to build your own deployment code.
đź’ˇ You can find the complete code from this tutorial on GitHub.
If you need a primer on how to build deployment code using Terraform for AWS, you can follow this tutorial provided by HashiCorp. It will give the first steps you need.
Getting things started
For the first step, create a variable that will be used throughout the code to reference what you are trying to accomplish here, which is creating your own Apache Kafka cluster.
variable "global_prefix" {
type = string
default = "my-own-apache-kafka-cluster"
}
We will call this variable global_prefix
to remember anyone that it is being used throughout the whole code. It provides the right catch for whoever tries to change it in the future that it may break things. The next step is to create a AWS KMS key for the Kafka cluster. This is important because Amazon MSK provides encryption at rest for the data, and having a key to manage that is a best practice.
resource "aws_kms_key" "kafka_kms_key" {
description = "Key for Apache Kafka"
}
The next step is to create a way for you to customize your Kafka cluster options. Amazon MSK supports the concept of custom configuration for Kafka clusters. This allows you to change which parameters are going to be written into the server.properties
file of each broker of the cluster.
resource "aws_msk_configuration" "kafka_config" {
kafka_versions = ["2.6.2"]
name = "${var.global_prefix}-config"
server_properties = <<EOF
auto.create.topics.enable = true
delete.topic.enable = true
EOF
}
Here, we are telling with the configuration provided we want to enable automatic topic creation in the Kafka cluster. This means that whenever developers write and read data from topics, they will be created automatically in the cluster without the need to issue administrative commands using the CLI or the Admin API. Similarly, we are enabling the deletion of topics, so the cluster won't reject any commands issued to delete topics.
Having these configurations are great because it allows you to have better control of your clusters, as they are managed separately from the cluster. You can share the same configuration with different Kafka clusters, or have each cluster with their own configuration. Here is the part where defining this via infrastructure-as-code can bring a huge flexibility to your deployments.
Now, for the moment we were all waiting for. Here is how you create a Kafka cluster on Amazon MSK.
resource "aws_msk_cluster" "kafka" {
cluster_name = var.global_prefix
kafka_version = "2.6.2"
number_of_broker_nodes = 3
broker_node_group_info {
instance_type = "kafka.m5.large"
storage_info {
ebs_storage_info {
volume_size = 1000
}
}
client_subnets = [aws_subnet.private_subnet[0].id,
aws_subnet.private_subnet[1].id,
aws_subnet.private_subnet[2].id]
security_groups = [aws_security_group.kafka.id]
}
encryption_info {
encryption_in_transit {
client_broker = "PLAINTEXT"
}
encryption_at_rest_kms_key_arn = aws_kms_key.kafka_kms_key.arn
}
configuration_info {
arn = aws_msk_configuration.kafka_config.arn
revision = aws_msk_configuration.kafka_config.latest_revision
}
tags = {
name = var.global_prefix
}
}
There is a lot going on here with this code; so before moving further with the implementation, let's understand what this code actually does.
- We set the name of the cluster to be what we set into the
global_prefix
variable. - We set the Kafka version to
2.6.2
. You can check the supported versions here. - This cluster will have three nodes, each one using the
kafka.m5.large
instance type. - We configured each broker to use an AWS EBS volume, with one terabyte of capacity.
- This cluster will run in private subnets and use a custom security group.
- Encryption in transit and at rest was enabled. For at rest, we used the custom KMS key.
- We have associated the custom configuration created before to this Kafka cluster.
This summarizes the obvious part of the code. Now let's discuss the non-obvious part.
Networking behind scenes
It was said that this cluster will run in private subnets. This is important because Kafka is a persistent layer for applications and microservices; and just like you would do with any other data store, it is a best practice to isolate the resource in private subnets. For this reason, you will need to create three subnets in a given VPC, associate a valid CIDR block for each, and map them to availability zones. Here is a quick way to do this.
data "aws_availability_zones" "available" {
state = "available"
}
variable "private_cidr_blocks" {
type = list(string)
default = [
"10.0.1.0/24",
"10.0.2.0/24",
"10.0.3.0/24",
]
}
resource "aws_subnet" "private_subnet" {
count = 3
vpc_id = aws_vpc.your_vpc.id
cidr_block = element(var.private_cidr_blocks, count.index)
map_public_ip_on_launch = false
availability_zone = data.aws_availability_zones.available.names[count.index]
tags = {
Name = "${var.global_prefix}-private-subnet-${count.index}"
}
}
For this to work, you will also need to create a route table to direct traffic, and associate each private subnet to this route table.
resource "aws_route_table" "private_route_table" {
vpc_id = aws_vpc.your_vpc.id
tags = {
Name = "${var.global_prefix}-private-route-table"
}
}
resource "aws_route_table_association" "private_subnet_association" {
count = length(data.aws_availability_zones.available.names)
subnet_id = element(aws_subnet.private_subnet.*.id, count.index)
route_table_id = aws_route_table.private_route_table.id
}
Finally, you need to create a security group for the Kafka cluster. This is required because you want to allow ingress traffic to the cluster over the exposed port 9092
, and this traffic needs to be enabled for all private subnets.
resource "aws_security_group" "kafka" {
name = "${var.global_prefix}-kafka"
vpc_id = aws_vpc.your_vpc.id
ingress {
from_port = 0
to_port = 9092
protocol = "TCP"
cidr_blocks = ["10.0.1.0/24",
"10.0.2.0/24",
"10.0.3.0/24"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = {
Name = "${var.global_prefix}-kafka"
}
}
That's it. Everything you need to create your Kafka cluster is in place. Now you can ask Terraform to run its magic:
terraform apply -auto-approve
After a few minutes, you will have your Kafka cluster up and running.
Going above and beyond
If you want to call the day and be done with the Terraform coding, you are good. However, there are a couple of things you can do extra with the Kafka cluster that may prove useful to you in the future, especially if you need to troubleshoot things. The first one is enabling logs in the Kafka cluster.
Having access to your Kafka cluster logs is important if you want to understand certain behaviors introduced by parameters in the configuration, or even how the Kafka cluster is reacting to the connections from producers and consumers. To enable logging, first you need to create an Amazon CloudWatch log group.
resource "aws_cloudwatch_log_group" "kafka_log_group" {
name = "kafka_broker_logs"
}
Then you have to update the definition of the Kafka cluster resource to include the logging support, and direct the logging to the created log group.
resource "aws_msk_cluster" "kafka" {
cluster_name = var.global_prefix
kafka_version = "2.6.2"
number_of_broker_nodes = 3
broker_node_group_info {
instance_type = "kafka.m5.large"
storage_info {
ebs_storage_info {
volume_size = 1000
}
}
client_subnets = [aws_subnet.private_subnet[0].id,
aws_subnet.private_subnet[1].id,
aws_subnet.private_subnet[2].id]
security_groups = [aws_security_group.kafka.id]
}
encryption_info {
encryption_in_transit {
client_broker = "PLAINTEXT"
}
encryption_at_rest_kms_key_arn = aws_kms_key.kafka_kms_key.arn
}
configuration_info {
arn = aws_msk_configuration.kafka_config.arn
revision = aws_msk_configuration.kafka_config.latest_revision
}
// Support for logging
logging_info {
broker_logs {
cloudwatch_logs {
enabled = true
log_group = aws_cloudwatch_log_group.kafka_log_group.name
}
}
}
tags = {
name = var.global_prefix
}
}
Apply this new version of the code with Terraform, and you will be able to visualize the logs of each Kafka broker from the cluster using CloudWatch.
The second change is creating a bastion server. This is a server you can use to connect to the Kafka cluster and run a few commands for testing and troubleshooting purposes, such as the CLI tools from Kafka. A bastion server (also known as a jump server) is a way for you to have access to a resource sitting in a private subnet from the public internet. The key idea is to allow this access without comprising the security best practices related to resource isolation. The bastion server must be the only one with access to the Kafka cluster, and you must restrict as much as possible who can connect to the bastion server.
Let's start by defining a security group for the bastion server.
resource "aws_security_group" "bastion_host" {
name = "${var.global_prefix}-bastion-host"
vpc_id = aws_vpc.your_vpc.id
ingress {
from_port = 22
to_port = 22
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = {
Name = "${var.global_prefix}-bastion-host"
}
}
In this example, we are allowing access to any computer over the internet to establish SSH connection via the port 22
to the bastion server. For testing purposes, this is okay. However, the CIDR block for ingress should be restricted to only a few machines, such as the public IP address of your own machine.
The next step is to update the security group created for the Kafka cluster to include an additional ingress rule.
resource "aws_security_group" "kafka" {
name = "${var.global_prefix}-kafka"
vpc_id = aws_vpc.your_vpc.id
ingress {
from_port = 0
to_port = 9092
protocol = "TCP"
cidr_blocks = ["10.0.1.0/24",
"10.0.2.0/24",
"10.0.3.0/24"]
}
// Additional ingress rule
ingress {
from_port = 0
to_port = 9092
protocol = "TCP"
cidr_blocks = ["10.0.4.0/24"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = {
Name = "${var.global_prefix}-kafka"
}
}
Note that the additional ingress rule specifies that any ingress traffic coming trying to reach the port 9092
over TCP is allowed, as long this traffic is coming from an IP address that belongs to the CIDR block 10.0.4.0/24
. This is where we will instruct Terraform to create the bastion server.
You are also going to need a public subnet for the bastion server. Create a public subnet using the CIDR block 10.0.4.0/24
and bind that subnet to one availability zone.
resource "aws_subnet" "bastion_host_subnet" {
vpc_id = aws_vpc.your_vpc.id
cidr_block = "10.0.4.0/24"
map_public_ip_on_launch = true
availability_zone = data.aws_availability_zones.available.names[0]
tags = {
Name = "${var.global_prefix}-bastion-host"
}
}
Now you can tell Terraform to create a bastion server with an Amazon EC2 instance.
resource "aws_instance" "bastion_host" {
depends_on = [aws_msk_cluster.kafka]
ami = data.aws_ami.amazon_linux_2.id
instance_type = "t2.micro"
key_name = aws_key_pair.private_key.key_name
subnet_id = aws_subnet.bastion_host_subnet.id
vpc_security_group_ids = [aws_security_group.bastion_host.id]
user_data = templatefile("bastion.tftpl", {
bootstrap_server_1 = split(",", aws_msk_cluster.kafka.bootstrap_brokers)[0]
bootstrap_server_2 = split(",", aws_msk_cluster.kafka.bootstrap_brokers)[1]
bootstrap_server_3 = split(",", aws_msk_cluster.kafka.bootstrap_brokers)[2]
})
root_block_device {
volume_type = "gp2"
volume_size = 100
}
tags = {
Name = "${var.global_prefix}-bastion-host"
}
}
This server will be created right after the Kafka cluster is created, as you can see in the depends_on
clause. This is going to be a t2.micro
instance type with Amazon Linux as operating system. The exact AWS AMI used for the operating system is retrieved by a data source that gets the latest release of Amazon Linux. This is the data source you must use.
data "aws_ami" "amazon_linux_2" {
most_recent = true
owners = ["amazon"]
filter {
name = "owner-alias"
values = ["amazon"]
}
filter {
name = "name"
values = ["amzn2-ami-hvm*"]
}
}
To connect via SSH from your machine to the bastion server, you will need the private key from the key pair associated with the bastion server. In the key_name
property of the bastion server, we have provided the name of the private key created for this bastion server. To create the key pair and private key, use the following code.
resource "tls_private_key" "private_key" {
algorithm = "RSA"
rsa_bits = 4096
}
resource "aws_key_pair" "private_key" {
key_name = var.global_prefix
public_key = tls_private_key.private_key.public_key_openssh
}
resource "local_file" "private_key" {
content = tls_private_key.private_key.private_key_pem
filename = "cert.pem"
}
resource "null_resource" "private_key_permissions" {
depends_on = [local_file.private_key]
provisioner "local-exec" {
command = "chmod 600 cert.pem"
interpreter = ["bash", "-c"]
on_failure = continue
}
}
This will create a private key using OpenSSH, and flush the contents of this private key into a local file on the machine where this Terraform code will be execute. This local file will be named cert.pem
.
You may have noticed that we provided something to the bastion server resource using the property user_data
. This is the custom bootstrap code that we want the bastion server to execute before opening itself for business. In this particular case, we want to update the dependencies of the bastion server, install an Apache Kafka distribution locally on the server so you can use the CLI tools from it, and also create a file with the bootstrap servers of the Kafka cluster. Create a new file locally called bastion.tftpl
, and add the following to it.
#!/bin/bash
yum update -y
yum install java-1.8.0 -y
yum install java-17-amazon-corretto-devel.x86_64 -y
wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -xzf kafka_2.12-2.2.1.tgz
rm kafka_2.12-2.2.1.tgz
cat > /home/ec2-user/bootstrap-servers <<- "EOF"
${bootstrap_server_1}
${bootstrap_server_2}
${bootstrap_server_3}
EOF
echo "PATH=$PATH:/bin:/usr/local/bin:/usr/bin:/kafka_2.12-2.2.1/bin" >> /home/ec2-user/.bash_profile
The last change we need right now is a way for you to know which public address to use to establish a SSH connection from your machine. You can do this with the following output.
output "execute_this_to_access_the_bastion_host" {
value = "ssh ec2-user@${aws_instance.bastion_host.public_ip} -i cert.pem"
}
It will print the exact command that you need to establish the SSH connection to the bastion server. Apply this new version of the code with Terraform. You should see the following output.
Apply complete! Resources: 7 added, 1 changed, 0 destroyed.
Outputs:
execute_this_to_access_the_bastion_host = "ssh ec2-user@<PUBLIC_IP> -i cert.pem“
Once you successfully connect to the bastion server, you can play with your Kafka cluster as if you were connecting from the private subnet where it currently executes.
[ec2-user@ip-10-0-4-150 ~]$ more bootstrap-servers
b-1.myownapachekafkacluste.bhyfwg.c3.kafka.us-east-1.amazonaws.com:9092
b-2.myownapachekafkacluste.bhyfwg.c3.kafka.us-east-1.amazonaws.com:9092
b-3.myownapachekafkacluste.bhyfwg.c3.kafka.us-east-1.amazonaws.com:9092
[ec2-user@ip-10-0-4-150 ~]$ kafka-console-consumer.sh --bootstrap-server b-1.myownapachekafkacluste.bhyfwg.c3.kafka.us-east-1.amazonaws.com:9092 --topic <TOPIC_NAME> --from-beginning
Remember that whenever you feel that you have played enough with the Kafka cluster, don't forget to destroy all the resources created to avoid ending up with an undesired bill to pay.
terraform destroy -auto-approve
Unless, of course, this is for a production environment. Then in this case, go out and celebrate a successful deployment powered by Apache Kafka, Terraform, and AWS. After the celebration, you may want to consider setting up a CI/CD pipeline for the Terraform code. It will help you to avoid getting things messy very quickly when multiple developers are working in the same code.
Are you curious to know how? Here is a nice tutorial that walks you through step by step.