Ensure MSK Connect Connectors are Encrypted in Transit
Overview
This check verifies that Amazon MSK Connect connectors have TLS encryption enabled for data transmitted between the connector and the Apache Kafka cluster. MSK Connect is a managed service that makes it easy to stream data to and from Apache Kafka clusters using Kafka Connect connectors.
When you create a connector, you specify how it communicates with your Kafka cluster. Without TLS encryption, data moves in plaintext over the network.
Risk
Without encryption in transit, data streams can be:
- Intercepted - Attackers can eavesdrop on network traffic and read sensitive data
- Modified - Man-in-the-middle attacks allow tampering with messages in flight
- Exploited - Credentials and secrets transmitted in plaintext can be captured
This exposes both the data confidentiality and integrity of your Kafka data pipelines.
Remediation Steps
Prerequisites
- AWS account access with permissions to manage MSK Connect connectors
- An existing MSK cluster with TLS encryption enabled (the cluster must support TLS for the connector to use it)
Important: Encryption settings cannot be changed after a connector is created. You must delete the non-compliant connector and create a new one with TLS enabled.
AWS Console Method
To check your existing connectors:
- Open the Amazon MSK console
- Select MSK Connect from the left navigation, then Connectors
- Click on your connector name
- Under Security, check the Encryption in transit setting
To create a new connector with TLS encryption:
- Open the Amazon MSK console
- Select MSK Connect from the left navigation, then Connectors
- Click Create connector
- Select a custom plugin and click Next
- Enter your connector name and configuration
- Select your MSK cluster
- Under Security:
- For Encryption in transit, select TLS
- Configure capacity, worker configuration, and logging as needed
- Select your service execution IAM role
- Click Create connector
To replace a non-compliant connector:
- Note the configuration of your existing connector (you will recreate it)
- Delete the non-compliant connector:
- Select the connector
- Choose Actions > Delete
- Confirm deletion
- Create a new connector following the steps above, ensuring TLS is selected
AWS CLI (optional)
List your MSK Connect connectors:
aws kafkaconnect list-connectors --region us-east-1
Check encryption settings for a specific connector:
aws kafkaconnect describe-connector \
--connector-arn arn:aws:kafkaconnect:us-east-1:123456789012:connector/my-connector/abc123 \
--region us-east-1 \
--query 'kafkaClusterEncryptionInTransit'
Delete a non-compliant connector:
aws kafkaconnect delete-connector \
--connector-arn arn:aws:kafkaconnect:us-east-1:123456789012:connector/my-connector/abc123 \
--region us-east-1
Create a new connector with TLS encryption:
aws kafkaconnect create-connector \
--connector-name my-secure-connector \
--kafkaconnect-version 2.7.1 \
--capacity '{
"autoScaling": {
"mcuCount": 1,
"minWorkerCount": 1,
"maxWorkerCount": 2,
"scaleInPolicy": {"cpuUtilizationPercentage": 20},
"scaleOutPolicy": {"cpuUtilizationPercentage": 80}
}
}' \
--connector-configuration '{
"connector.class": "com.example.MyConnector",
"tasks.max": "1",
"topics": "my-topic"
}' \
--kafka-cluster '{
"apacheKafkaCluster": {
"bootstrapServers": "b-1.mycluster.abc123.c1.kafka.us-east-1.amazonaws.com:9094",
"vpc": {
"securityGroups": ["sg-12345678"],
"subnets": ["subnet-111111", "subnet-222222", "subnet-333333"]
}
}
}' \
--kafka-cluster-client-authentication '{"authenticationType": "NONE"}' \
--kafka-cluster-encryption-in-transit '{"encryptionType": "TLS"}' \
--plugins '[{
"customPlugin": {
"customPluginArn": "arn:aws:kafkaconnect:us-east-1:123456789012:custom-plugin/my-plugin/abc123",
"revision": 1
}
}]' \
--service-execution-role-arn arn:aws:iam::123456789012:role/my-connector-role \
--region us-east-1
Replace the placeholder values with your actual:
- Bootstrap servers (use TLS port 9094)
- Security group and subnet IDs
- Custom plugin ARN and revision
- Service execution role ARN
CloudFormation (optional)
AWSTemplateFormatVersion: '2010-09-09'
Description: MSK Connect connector with encryption in transit enabled
Parameters:
ConnectorName:
Type: String
Description: Name of the MSK Connect connector
Default: my-connector
MinLength: 1
MaxLength: 128
KafkaConnectVersion:
Type: String
Description: Kafka Connect version
Default: '2.7.1'
BootstrapServers:
Type: String
Description: Bootstrap servers for the MSK cluster (comma-separated)
SubnetIds:
Type: List<AWS::EC2::Subnet::Id>
Description: Subnet IDs for the connector
SecurityGroupIds:
Type: List<AWS::EC2::SecurityGroup::Id>
Description: Security group IDs for the connector
CustomPluginArn:
Type: String
Description: ARN of the custom plugin
CustomPluginRevision:
Type: Number
Description: Revision number of the custom plugin
Default: 1
ServiceExecutionRoleArn:
Type: String
Description: ARN of the IAM role for the connector
Resources:
MSKConnector:
Type: AWS::KafkaConnect::Connector
Properties:
ConnectorName: !Ref ConnectorName
KafkaConnectVersion: !Ref KafkaConnectVersion
Capacity:
AutoScaling:
McuCount: 1
MinWorkerCount: 1
MaxWorkerCount: 2
ScaleInPolicy:
CpuUtilizationPercentage: 20
ScaleOutPolicy:
CpuUtilizationPercentage: 80
ConnectorConfiguration:
connector.class: com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector
tasks.max: '1'
topics: example
KafkaCluster:
ApacheKafkaCluster:
BootstrapServers: !Ref BootstrapServers
Vpc:
SecurityGroups: !Ref SecurityGroupIds
Subnets: !Ref SubnetIds
KafkaClusterClientAuthentication:
AuthenticationType: NONE
KafkaClusterEncryptionInTransit:
EncryptionType: TLS
Plugins:
- CustomPlugin:
CustomPluginArn: !Ref CustomPluginArn
Revision: !Ref CustomPluginRevision
ServiceExecutionRoleArn: !Ref ServiceExecutionRoleArn
Outputs:
ConnectorArn:
Description: ARN of the MSK Connect connector
Value: !GetAtt MSKConnector.ConnectorArn
Key configuration:
EncryptionType: TLS- Enforces TLS encryption for all data in transit to the Kafka cluster
Terraform (optional)
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
provider "aws" {
region = "us-east-1"
}
variable "connector_name" {
description = "Name of the MSK Connect connector"
type = string
default = "my-connector"
}
variable "kafkaconnect_version" {
description = "Kafka Connect version"
type = string
default = "2.7.1"
}
variable "bootstrap_servers" {
description = "Bootstrap servers for the MSK cluster"
type = string
}
variable "subnet_ids" {
description = "List of subnet IDs for the connector"
type = list(string)
}
variable "security_group_ids" {
description = "List of security group IDs for the connector"
type = list(string)
}
variable "custom_plugin_arn" {
description = "ARN of the custom plugin"
type = string
}
variable "custom_plugin_revision" {
description = "Revision number of the custom plugin"
type = number
default = 1
}
variable "service_execution_role_arn" {
description = "ARN of the IAM role for the connector"
type = string
}
resource "aws_mskconnect_connector" "main" {
name = var.connector_name
kafkaconnect_version = var.kafkaconnect_version
capacity {
autoscaling {
mcu_count = 1
min_worker_count = 1
max_worker_count = 2
scale_in_policy {
cpu_utilization_percentage = 20
}
scale_out_policy {
cpu_utilization_percentage = 80
}
}
}
connector_configuration = {
"connector.class" = "com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector"
"tasks.max" = "1"
"topics" = "example"
}
kafka_cluster {
apache_kafka_cluster {
bootstrap_servers = var.bootstrap_servers
vpc {
security_groups = var.security_group_ids
subnets = var.subnet_ids
}
}
}
kafka_cluster_client_authentication {
authentication_type = "NONE"
}
kafka_cluster_encryption_in_transit {
encryption_type = "TLS"
}
plugin {
custom_plugin {
arn = var.custom_plugin_arn
revision = var.custom_plugin_revision
}
}
service_execution_role_arn = var.service_execution_role_arn
tags = {
Name = var.connector_name
}
}
output "connector_arn" {
description = "ARN of the MSK Connect connector"
value = aws_mskconnect_connector.main.arn
}
Key configuration:
encryption_type = "TLS"- Enforces TLS encryption for all data in transit to the Kafka cluster
Verification
After creating the new connector, verify encryption is properly configured:
- In the AWS Console, navigate to MSK Connect > Connectors
- Click on your connector name
- Under Security, confirm Encryption in transit shows TLS
CLI Verification
aws kafkaconnect describe-connector \
--connector-arn <your-connector-arn> \
--region us-east-1 \
--query 'kafkaClusterEncryptionInTransit'
Expected output for a properly configured connector:
{
"encryptionType": "TLS"
}
Additional Resources
- Amazon MSK Connect Documentation
- MSK Connect Security
- AWS::KafkaConnect::Connector CloudFormation Reference
- Terraform aws_mskconnect_connector Resource
Notes
- Encryption is immutable: Once a connector is created, you cannot change its encryption settings. You must delete and recreate the connector to enable TLS.
- MSK cluster must support TLS: Your underlying MSK cluster must have TLS encryption enabled for connectors to use TLS. Ensure your cluster's client-broker encryption is set to TLS or TLS_PLAINTEXT.
- Use TLS bootstrap servers: When configuring bootstrap servers, use the TLS endpoint (typically port 9094) rather than the plaintext endpoint (port 9092).
- Connector downtime: Deleting and recreating a connector will cause temporary data pipeline interruption. Plan this during a maintenance window if possible.
- Compliance: This control helps meet requirements for SOC 2, ISO 27001, KISA-ISMS-P, NIS2, and C5 frameworks.