Apache Kafka has been growing in popularity as the de facto way to share streams of events with different systems. While Kafka itself provides the perfect durable log-based storage for events; Kafka Connect provides the right framework to build connectors capable of reading data from sources into Kafka, and share data that already exist in Kafka with the rest of the world. Design patterns like Change Data Capture might find in Kafka the right ecosystem to be implemented successfully, and companies all around the world are adopting it massively because of this.
However, developers often complain that implementing use cases with Kafka Connect in the cloud is not as easy as developing their projects locally. While developing locally, container technologies like Docker and Docker Compose take care of much of the plumbing in such a way that most of the issues with Kafka Connect become imperceptible. But with the cloud, however, the architecture, constraints, and dependencies are different. The issues deemed as imperceptible pop up. Therefore, the adoption of fully managed cloud services, such as Amazon MSK Connect, proves to be extremely useful. In this article, I will explain how to deploy a Kafka Connect connector in Amazon MSK Connect service using Terraform.
💡 You can find the complete code from this tutorial on GitHub.
This tutorial will focus on explaining how to deploy a Kafka Connect connector on Amazon MSK Connect. For this to work, you will need a Kafka cluster. You can follow the steps shown in this other tutorial to create one also using Terraform.
Getting things started
For the first step, create a variable that will be used throughout the code to reference what you are trying to accomplish here, which is deploying your first Kafka Connect connector.
variable "my_1st_kf_connector" {
type = string
default = "my-first-kafka-connector"
}
Then, create an Amazon S3 bucket. This bucket will be used to store the implementation of the Kafka Connect connector. When we eventually deploy the connector, the code for the connector must already exist so the deployment is successful.
resource "aws_s3_bucket" "connector_bucket" {
bucket = "${var.my_1st_kf_connector}-${random_string.random_string.result}"
}
To avoid any clashes of different developers running the same Terraform code, we used a random string as part of the name of the bucket. In AWS, buckets must have unique names, as they are globally available. This is how you create a random string in Terraform.
resource "random_string" "random_string" {
length = 8
special = false
upper = false
lower = true
numeric = false
}
Now that we have a bucket to work with, we need to add an object inside that bucket with the code for the connector. The snippet below shows how to upload an JAR file containing the code for the connector inside the bucket created before.
resource "aws_s3_object" "connector_code" {
bucket = aws_s3_bucket.connector_bucket.bucket
key = "${var.my_1st_kf_connector}.jar"
content_type = "application/java-archive"
source = "../target/${var.my_1st_kf_connector}-1.0.jar"
}
Note that we are assuming that a JAR file exist in the local folder ../target
. For this tutorial to work, you will need to provide a valid JAR file containing the implementation code of a Kafka Connect connector. If you don't have one already, you can use the code provided by this code repository here.
🧑🏻💻 https://github.com/build-on-aws/building-apache-kafka-connectors
The instructions for you to build the code from this repository and generate a JAR file are available in the section Building the connector of the README page. But the short version of it is that you will need to have Java and Maven installed in your local machine, then run the command mvn clean package
to generate the JAR file. Let me know in the comments if you need help with that.
If you ask Terraform to run its magic, you a matter of minutes you will have the connector code uploaded to AWS and stored in an Amazon S3 bucket.
terraform apply -auto-approve
Deploying the connector
Now that we have the connector code properly uploaded into Amazon S3, we can create a custom plugin. A custom plugin is how Amazon MSK Connect refers to the Kafka Connect connector's implementation.
resource "aws_mskconnect_custom_plugin" "plugin" {
name = "${var.my_1st_kf_connector}-plugin"
content_type = "JAR"
location {
s3 {
bucket_arn = aws_s3_bucket.connector_bucket.arn
file_key = aws_s3_object.connector_code.key
}
}
}
The actual connector can be implemented as shown below. A connector for Amazon MSK Connect is the entity that governs its lifecycle, how it scales up and down, and how it connects with the Kafka cluster.
resource "aws_mskconnect_connector" "connector" {
name = var.my_1st_kf_connector
kafkaconnect_version = "2.7.1"
capacity {
autoscaling {
mcu_count = 1
min_worker_count = 1
max_worker_count = 2
scale_in_policy {
cpu_utilization_percentage = 20
}
scale_out_policy {
cpu_utilization_percentage = 80
}
}
}
connector_configuration = {
"connector.class" = "tutorial.buildon.aws.streaming.kafka.MyFirstKafkaConnector"
"key.converter" = "org.apache.kafka.connect.converters.ByteArrayConverter"
"value.converter" = "org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable" = "false"
"first.required.param" = "Kafka"
"second.required.param" = "Connect"
"tasks.max" = "3"
}
kafka_cluster {
apache_kafka_cluster {
bootstrap_servers = aws_msk_cluster.kafka.bootstrap_brokers
vpc {
security_groups = [aws_security_group.connector.id]
subnets = [aws_subnet.private_subnet[0].id,
aws_subnet.private_subnet[1].id,
aws_subnet.private_subnet[2].id]
}
}
}
kafka_cluster_client_authentication {
authentication_type = "NONE"
}
kafka_cluster_encryption_in_transit {
encryption_type = "PLAINTEXT"
}
plugin {
custom_plugin {
arn = aws_mskconnect_custom_plugin.plugin.arn
revision = aws_mskconnect_custom_plugin.plugin.latest_revision
}
}
service_execution_role_arn = aws_iam_role.connector_role.arn
}
In the capacity
property of the connector, we have defined the criteria for which scalability strategy we want the connector to follow. Kafka Connect's unit of execution are tasks. These are the logical threads that the runtime will spin up to execute the connector's code. In this context, the runtime is the JVM process executing those threads. They are called workers. We configured an initial pool size of 1
worker that can grow to up 2
workers. The criteria used for when the worker's pool size will increase or decrease are based on CPU utilization. The scale in policy was set to 20%
of CPU, and the scale out policy was set to 80%
of CPU.
In the property security_groups
, you must provide a security group that allows the connector to connect with the Kafka cluster while considering the other endpoints it may need to connect. Whether if the connector is a source or a sink—it must be able to start connections with the systems it intends to read or write data.
It is important to highlight the function of the service_execution_role_arn
property. This is going to be the AWS IAM role the connector will use to perform tasks such as interact with Amazon S3 to perform CRUD operations, as well as effectively be able to execute the Kafka Connect connector's code. Thus, you must create an AWS IAM role with a policy containing the right permissions.
resource "aws_iam_role" "connector_role" {
name = "${var.my_1st_kf_connector}-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
"Effect": "Allow",
"Principal": {
"Service": "kafkaconnect.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
})
}
resource "aws_iam_role_policy" "connector_role_policy" {
name = "${var.my_1st_kf_connector}-role-policy"
role = aws_iam_role.connector_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets"
],
"Resource": "arn:aws:s3:::*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::*"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts",
"s3:ListBucketMultipartUploads"
],
"Resource": "*"
}
]
})
}
Run Terraform again. In a matter of minutes, you will have the Kafka Connect connector properly deployed on Amazon MSK Connect.
Enabling logging
One of the trickiest parts of working with Kafka Connect connectors is knowing if they are working as expected. This happens because they run in the background, as an entity managed by the Kafka Connect runtime, and there is very limited visibility into them by default.
For instance, you could use the REST APIs from Kafka Connect runtime to inspect details about the deployed connectors. But since Amazon MSK Connect is a fully managed cloud service, it does not make them available for you to use. And even if you had access to them, there would be challenges. First, you would need to expose them outside of the private subnets the connector had been deployed. This means giving up a bit of the security best practices of resource isolation, and worry about opening ports to the outside world via security groups. Second, they only provide limited visibility into the connector. You can check if they are running or stopped, check the number of tasks available, and manage their lifecycle. This is useful. But it doesn't really tell you what is going on behind the scenes.
A better way to look into your connectors is via logging. You need to enable logging in the connector, and redirect its content into a place you can inspect them without giving up on security best practices. Let's see now how to enable logging in the deployed connector and visualize them with Amazon CloudWatch.
To enable logging, first you need to create an Amazon CloudWatch log group.
resource "aws_cloudwatch_log_group" "connector_log_group" {
name = "kafka_connect_logs"
}
Then you have to update the definition of the Kafka Connect connector resource to include the logging support, and direct the logging to the created log group.
resource "aws_mskconnect_connector" "connector" {
name = var.my_1st_kf_connector
kafkaconnect_version = "2.7.1"
capacity {
autoscaling {
mcu_count = 1
min_worker_count = 1
max_worker_count = 2
scale_in_policy {
cpu_utilization_percentage = 20
}
scale_out_policy {
cpu_utilization_percentage = 80
}
}
}
connector_configuration = {
"connector.class" = "tutorial.buildon.aws.streaming.kafka.MyFirstKafkaConnector"
"key.converter" = "org.apache.kafka.connect.converters.ByteArrayConverter"
"value.converter" = "org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable" = "false"
"first.required.param" = "Kafka"
"second.required.param" = "Connect"
"tasks.max" = "1"
}
kafka_cluster {
apache_kafka_cluster {
bootstrap_servers = aws_msk_cluster.kafka.bootstrap_brokers
vpc {
security_groups = [aws_security_group.connector.id]
subnets = [aws_subnet.private_subnet[0].id,
aws_subnet.private_subnet[1].id,
aws_subnet.private_subnet[2].id]
}
}
}
kafka_cluster_client_authentication {
authentication_type = "NONE"
}
kafka_cluster_encryption_in_transit {
encryption_type = "PLAINTEXT"
}
plugin {
custom_plugin {
arn = aws_mskconnect_custom_plugin.plugin.arn
revision = aws_mskconnect_custom_plugin.plugin.latest_revision
}
}
// Support for logging
log_delivery {
worker_log_delivery {
cloudwatch_logs {
enabled = true
log_group = aws_cloudwatch_log_group.connector_log_group.name
}
}
}
service_execution_role_arn = aws_iam_role.connector_role.arn
}
Apply this new version of the code with Terraform, and you will be able to visualize the logs of the worker nodes using CloudWatch.
Remember that whenever you feel that you have played enough with the Kafka Connect connector, don't forget to destroy all the resources created to avoid ending up with an undesired bill to pay.
terraform destroy -auto-approve