Skip to main content

Ensure MSK Connect Connectors are Encrypted in Transit

Overview

This check verifies that Amazon MSK Connect connectors have TLS encryption enabled for data transmitted between the connector and the Apache Kafka cluster. MSK Connect is a managed service that makes it easy to stream data to and from Apache Kafka clusters using Kafka Connect connectors.

When you create a connector, you specify how it communicates with your Kafka cluster. Without TLS encryption, data moves in plaintext over the network.

Risk

Without encryption in transit, data streams can be:

  • Intercepted - Attackers can eavesdrop on network traffic and read sensitive data
  • Modified - Man-in-the-middle attacks allow tampering with messages in flight
  • Exploited - Credentials and secrets transmitted in plaintext can be captured

This exposes both the data confidentiality and integrity of your Kafka data pipelines.

Remediation Steps

Prerequisites

  • AWS account access with permissions to manage MSK Connect connectors
  • An existing MSK cluster with TLS encryption enabled (the cluster must support TLS for the connector to use it)

Important: Encryption settings cannot be changed after a connector is created. You must delete the non-compliant connector and create a new one with TLS enabled.

AWS Console Method

To check your existing connectors:

  1. Open the Amazon MSK console
  2. Select MSK Connect from the left navigation, then Connectors
  3. Click on your connector name
  4. Under Security, check the Encryption in transit setting

To create a new connector with TLS encryption:

  1. Open the Amazon MSK console
  2. Select MSK Connect from the left navigation, then Connectors
  3. Click Create connector
  4. Select a custom plugin and click Next
  5. Enter your connector name and configuration
  6. Select your MSK cluster
  7. Under Security:
    • For Encryption in transit, select TLS
  8. Configure capacity, worker configuration, and logging as needed
  9. Select your service execution IAM role
  10. Click Create connector

To replace a non-compliant connector:

  1. Note the configuration of your existing connector (you will recreate it)
  2. Delete the non-compliant connector:
    • Select the connector
    • Choose Actions > Delete
    • Confirm deletion
  3. Create a new connector following the steps above, ensuring TLS is selected
AWS CLI (optional)

List your MSK Connect connectors:

aws kafkaconnect list-connectors --region us-east-1

Check encryption settings for a specific connector:

aws kafkaconnect describe-connector \
--connector-arn arn:aws:kafkaconnect:us-east-1:123456789012:connector/my-connector/abc123 \
--region us-east-1 \
--query 'kafkaClusterEncryptionInTransit'

Delete a non-compliant connector:

aws kafkaconnect delete-connector \
--connector-arn arn:aws:kafkaconnect:us-east-1:123456789012:connector/my-connector/abc123 \
--region us-east-1

Create a new connector with TLS encryption:

aws kafkaconnect create-connector \
--connector-name my-secure-connector \
--kafkaconnect-version 2.7.1 \
--capacity '{
"autoScaling": {
"mcuCount": 1,
"minWorkerCount": 1,
"maxWorkerCount": 2,
"scaleInPolicy": {"cpuUtilizationPercentage": 20},
"scaleOutPolicy": {"cpuUtilizationPercentage": 80}
}
}' \
--connector-configuration '{
"connector.class": "com.example.MyConnector",
"tasks.max": "1",
"topics": "my-topic"
}' \
--kafka-cluster '{
"apacheKafkaCluster": {
"bootstrapServers": "b-1.mycluster.abc123.c1.kafka.us-east-1.amazonaws.com:9094",
"vpc": {
"securityGroups": ["sg-12345678"],
"subnets": ["subnet-111111", "subnet-222222", "subnet-333333"]
}
}
}' \
--kafka-cluster-client-authentication '{"authenticationType": "NONE"}' \
--kafka-cluster-encryption-in-transit '{"encryptionType": "TLS"}' \
--plugins '[{
"customPlugin": {
"customPluginArn": "arn:aws:kafkaconnect:us-east-1:123456789012:custom-plugin/my-plugin/abc123",
"revision": 1
}
}]' \
--service-execution-role-arn arn:aws:iam::123456789012:role/my-connector-role \
--region us-east-1

Replace the placeholder values with your actual:

  • Bootstrap servers (use TLS port 9094)
  • Security group and subnet IDs
  • Custom plugin ARN and revision
  • Service execution role ARN
CloudFormation (optional)
AWSTemplateFormatVersion: '2010-09-09'
Description: MSK Connect connector with encryption in transit enabled

Parameters:
ConnectorName:
Type: String
Description: Name of the MSK Connect connector
Default: my-connector
MinLength: 1
MaxLength: 128

KafkaConnectVersion:
Type: String
Description: Kafka Connect version
Default: '2.7.1'

BootstrapServers:
Type: String
Description: Bootstrap servers for the MSK cluster (comma-separated)

SubnetIds:
Type: List<AWS::EC2::Subnet::Id>
Description: Subnet IDs for the connector

SecurityGroupIds:
Type: List<AWS::EC2::SecurityGroup::Id>
Description: Security group IDs for the connector

CustomPluginArn:
Type: String
Description: ARN of the custom plugin

CustomPluginRevision:
Type: Number
Description: Revision number of the custom plugin
Default: 1

ServiceExecutionRoleArn:
Type: String
Description: ARN of the IAM role for the connector

Resources:
MSKConnector:
Type: AWS::KafkaConnect::Connector
Properties:
ConnectorName: !Ref ConnectorName
KafkaConnectVersion: !Ref KafkaConnectVersion
Capacity:
AutoScaling:
McuCount: 1
MinWorkerCount: 1
MaxWorkerCount: 2
ScaleInPolicy:
CpuUtilizationPercentage: 20
ScaleOutPolicy:
CpuUtilizationPercentage: 80
ConnectorConfiguration:
connector.class: com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector
tasks.max: '1'
topics: example
KafkaCluster:
ApacheKafkaCluster:
BootstrapServers: !Ref BootstrapServers
Vpc:
SecurityGroups: !Ref SecurityGroupIds
Subnets: !Ref SubnetIds
KafkaClusterClientAuthentication:
AuthenticationType: NONE
KafkaClusterEncryptionInTransit:
EncryptionType: TLS
Plugins:
- CustomPlugin:
CustomPluginArn: !Ref CustomPluginArn
Revision: !Ref CustomPluginRevision
ServiceExecutionRoleArn: !Ref ServiceExecutionRoleArn

Outputs:
ConnectorArn:
Description: ARN of the MSK Connect connector
Value: !GetAtt MSKConnector.ConnectorArn

Key configuration:

  • EncryptionType: TLS - Enforces TLS encryption for all data in transit to the Kafka cluster
Terraform (optional)
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}

provider "aws" {
region = "us-east-1"
}

variable "connector_name" {
description = "Name of the MSK Connect connector"
type = string
default = "my-connector"
}

variable "kafkaconnect_version" {
description = "Kafka Connect version"
type = string
default = "2.7.1"
}

variable "bootstrap_servers" {
description = "Bootstrap servers for the MSK cluster"
type = string
}

variable "subnet_ids" {
description = "List of subnet IDs for the connector"
type = list(string)
}

variable "security_group_ids" {
description = "List of security group IDs for the connector"
type = list(string)
}

variable "custom_plugin_arn" {
description = "ARN of the custom plugin"
type = string
}

variable "custom_plugin_revision" {
description = "Revision number of the custom plugin"
type = number
default = 1
}

variable "service_execution_role_arn" {
description = "ARN of the IAM role for the connector"
type = string
}

resource "aws_mskconnect_connector" "main" {
name = var.connector_name

kafkaconnect_version = var.kafkaconnect_version

capacity {
autoscaling {
mcu_count = 1
min_worker_count = 1
max_worker_count = 2

scale_in_policy {
cpu_utilization_percentage = 20
}

scale_out_policy {
cpu_utilization_percentage = 80
}
}
}

connector_configuration = {
"connector.class" = "com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector"
"tasks.max" = "1"
"topics" = "example"
}

kafka_cluster {
apache_kafka_cluster {
bootstrap_servers = var.bootstrap_servers

vpc {
security_groups = var.security_group_ids
subnets = var.subnet_ids
}
}
}

kafka_cluster_client_authentication {
authentication_type = "NONE"
}

kafka_cluster_encryption_in_transit {
encryption_type = "TLS"
}

plugin {
custom_plugin {
arn = var.custom_plugin_arn
revision = var.custom_plugin_revision
}
}

service_execution_role_arn = var.service_execution_role_arn

tags = {
Name = var.connector_name
}
}

output "connector_arn" {
description = "ARN of the MSK Connect connector"
value = aws_mskconnect_connector.main.arn
}

Key configuration:

  • encryption_type = "TLS" - Enforces TLS encryption for all data in transit to the Kafka cluster

Verification

After creating the new connector, verify encryption is properly configured:

  1. In the AWS Console, navigate to MSK Connect > Connectors
  2. Click on your connector name
  3. Under Security, confirm Encryption in transit shows TLS
CLI Verification
aws kafkaconnect describe-connector \
--connector-arn <your-connector-arn> \
--region us-east-1 \
--query 'kafkaClusterEncryptionInTransit'

Expected output for a properly configured connector:

{
"encryptionType": "TLS"
}

Additional Resources

Notes

  • Encryption is immutable: Once a connector is created, you cannot change its encryption settings. You must delete and recreate the connector to enable TLS.
  • MSK cluster must support TLS: Your underlying MSK cluster must have TLS encryption enabled for connectors to use TLS. Ensure your cluster's client-broker encryption is set to TLS or TLS_PLAINTEXT.
  • Use TLS bootstrap servers: When configuring bootstrap servers, use the TLS endpoint (typically port 9094) rather than the plaintext endpoint (port 9092).
  • Connector downtime: Deleting and recreating a connector will cause temporary data pipeline interruption. Plan this during a maintenance window if possible.
  • Compliance: This control helps meet requirements for SOC 2, ISO 27001, KISA-ISMS-P, NIS2, and C5 frameworks.