CloudWiki

Amazon Web Service (AWS)

MSK

Queue
Amazon Managed Streaming for Apache Kafka (MSK) is a managed service used for building and running Apache Kafka applications to process streaming data.
aws_msk_cluster
MSK
attributes:
  • broker_node_group_info - (Required) Configuration block for the broker nodes of the Kafka cluster.
  • cluster_name - (Required) Name of the MSK cluster.
  • kafka_version - (Required) Specify the desired Kafka software version.
  • number_of_broker_nodes - (Required) The desired total number of broker nodes in the kafka cluster. It must be a multiple of the number of specified client subnets.
  • client_authentication - (Optional) Configuration block for specifying a client authentication. See below.
  • configuration_info - (Optional) Configuration block for specifying a MSK Configuration to attach to Kafka brokers. See below.
  • encryption_info - (Optional) Configuration block for specifying encryption. See below.
  • enhanced_monitoring - (Optional) Specify the desired enhanced MSK CloudWatch monitoring level. See Monitoring Amazon MSK with Amazon CloudWatch
  • open_monitoring - (Optional) Configuration block for JMX and Node monitoring for the MSK cluster. See below.
  • logging_info - (Optional) Configuration block for streaming broker logs to Cloudwatch/S3/Kinesis Firehose. See below.
  • tags - (Optional) A map of tags to assign to the resource. If configured with a provider default_tags configuration block present, tags with matching keys will overwrite those defined at the provider-level.

broker_node_group_info Argument Reference

  • client_subnets - (Required) A list of subnets to connect to in client VPC (documentation).
  • ebs_volume_size - (Optional, Deprecated use storage_info.ebs_storage_info.volume_size instead) The size in GiB of the EBS volume for the data drive on each broker node.
  • instance_type - (Required) Specify the instance type to use for the kafka brokersE.g., kafka.m5.large. (Pricing info)
  • security_groups - (Required) A list of the security groups to associate with the elastic network interfaces to control who can communicate with the cluster.
  • az_distribution - (Optional) The distribution of broker nodes across availability zones (documentation). Currently the only valid value is DEFAULT.
  • connectivity_info - (Optional) Information about the cluster access configuration. See below. For security reasons, you can't turn on public access while creating an MSK cluster. However, you can update an existing cluster to make it publicly accessible. You can also create a new cluster and then update it to make it publicly accessible (documentation).
  • storage_info - (Optional) A block that contains information about storage volumes attached to MSK broker nodes. See below.

broker_node_group_info connectivity_info Argument Reference

  • public_access - (Optional) Access control settings for brokers. See below.

connectivity_info public_access Argument Reference

  • type - (Optional) Public access type. Valida values: DISABLED, SERVICE_PROVIDED_EIPS.

broker_node_group_info storage_info Argument Reference

  • ebs_storage_info - (Optional) A block that contains EBS volume information. See below.

storage_info ebs_storage_info Argument Reference

  • provisioned_throughput - (Optional) A block that contains EBS volume provisioned throughput information. To provision storage throughput, you must choose broker type kafka.m5.4xlarge or larger. See below.
  • volume_size - (Optional) The size in GiB of the EBS volume for the data drive on each broker node. Minimum value of 1 and maximum value of 16384.

ebs_storage_info provisioned_throughput Argument Reference

  • enabled - (Optional) Controls whether provisioned throughput is enabled or not. Default value: false.
  • volume_throughput - (Optional) Throughput value of the EBS volumes for the data drive on each kafka broker node in MiB per second. The minimum value is 250. The maximum value varies between broker type. You can refer to the valid values for the maximum volume throughput at the following documentation on throughput bottlenecks

client_authentication Argument Reference

  • sasl - (Optional) Configuration block for specifying SASL client authentication. See below.
  • tls - (Optional) Configuration block for specifying TLS client authentication. See below.
  • unauthenticated - (Optional) Enables unauthenticated access.

client_authentication sasl Argument Reference

  • iam - (Optional) Enables IAM client authentication. Defaults to false.
  • scram - (Optional) Enables SCRAM client authentication via AWS Secrets Manager. Defaults to false.

client_authentication tls Argument Reference

  • certificate_authority_arns - (Optional) List of ACM Certificate Authority Amazon Resource Names (ARNs).

configuration_info Argument Reference

  • arn - (Required) Amazon Resource Name (ARN) of the MSK Configuration to use in the cluster.
  • revision - (Required) Revision of the MSK Configuration to use in the cluster.

encryption_info Argument Reference

  • encryption_in_transit - (Optional) Configuration block to specify encryption in transit. See below.
  • encryption_at_rest_kms_key_arn - (Optional) You may specify a KMS key short ID or ARN (it will always output an ARN) to use for encrypting your data at rest. If no key is specified, an AWS managed KMS ('aws/msk' managed service) key will be used for encrypting the data at rest.

encryption_info encryption_in_transit Argument Reference

  • client_broker - (Optional) Encryption setting for data in transit between clients and brokers. Valid values: TLS, TLS_PLAINTEXT, and PLAINTEXT. Default value is TLS.
  • in_cluster - (Optional) Whether data communication among broker nodes is encrypted. Default value: true.

open_monitoring Argument Reference

  • prometheus - (Required) Configuration block for Prometheus settings for open monitoring. See below.

open_monitoring prometheus Argument Reference

  • jmx_exporter - (Optional) Configuration block for JMX Exporter. See below.
  • node_exporter - (Optional) Configuration block for Node Exporter. See below.

open_monitoring prometheus jmx_exporter Argument Reference

  • enabled_in_broker - (Required) Indicates whether you want to enable or disable the JMX Exporter.

open_monitoring prometheus node_exporter Argument Reference

  • enabled_in_broker - (Required) Indicates whether you want to enable or disable the Node Exporter.

logging_info Argument Reference

  • broker_logs - (Required) Configuration block for Broker Logs settings for logging info. See below.

logging_info broker_logs cloudwatch_logs Argument Reference

  • enabled - (Optional) Indicates whether you want to enable or disable streaming broker logs to Cloudwatch Logs.
  • log_group - (Optional) Name of the Cloudwatch Log Group to deliver logs to.

logging_info broker_logs firehose Argument Reference

  • enabled - (Optional) Indicates whether you want to enable or disable streaming broker logs to Kinesis Data Firehose.
  • delivery_stream - (Optional) Name of the Kinesis Data Firehose delivery stream to deliver logs to.

logging_info broker_logs s3 Argument Reference

  • enabled - (Optional) Indicates whether you want to enable or disable streaming broker logs to S3.
  • bucket - (Optional) Name of the S3 bucket to deliver logs to.
  • prefix - (Optional) Prefix to append to the folder name.

Associating resources with a
MSK
Resources do not "belong" to a
MSK
Rather, one or more Security Groups are associated to a resource.
Create
MSK
via Terraform:
The following HCL creates a basic MSK cluster
Syntax:

resource "aws_vpc" "vpc" {
 cidr_block = "192.168.0.0/22"
}

data "aws_availability_zones" "azs" {
 state = "available"
}

resource "aws_subnet" "subnet_az1" {
 availability_zone = data.aws_availability_zones.azs.names[0]
 cidr_block        = "192.168.0.0/24"
 vpc_id            = aws_vpc.vpc.id
}

resource "aws_subnet" "subnet_az2" {
 availability_zone = data.aws_availability_zones.azs.names[1]
 cidr_block        = "192.168.1.0/24"
 vpc_id            = aws_vpc.vpc.id
}

resource "aws_subnet" "subnet_az3" {
 availability_zone = data.aws_availability_zones.azs.names[2]
 cidr_block        = "192.168.2.0/24"
 vpc_id            = aws_vpc.vpc.id
}

resource "aws_security_group" "sg" {
 vpc_id = aws_vpc.vpc.id
}

resource "aws_kms_key" "kms" {
 description = "example"
}

resource "aws_cloudwatch_log_group" "test" {
 name = "msk_broker_logs"
}

resource "aws_s3_bucket" "bucket" {
 bucket = "msk-broker-logs-bucket"
}

resource "aws_s3_bucket_acl" "bucket_acl" {
 bucket = aws_s3_bucket.bucket.id
 acl    = "private"
}

resource "aws_iam_role" "firehose_role" {
 name = "firehose_test_role"

 assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
 {
   "Action": "sts:AssumeRole",
   "Principal": {
     "Service": "firehose.amazonaws.com"
   },
   "Effect": "Allow",
   "Sid": ""
 }
 ]
}
EOF
}

resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
 name        = "terraform-kinesis-firehose-msk-broker-logs-stream"
 destination = "s3"

 s3_configuration {
   role_arn   = aws_iam_role.firehose_role.arn
   bucket_arn = aws_s3_bucket.bucket.arn
 }

 tags = {
   LogDeliveryEnabled = "placeholder"
 }

 lifecycle {
   ignore_changes = [
     tags["LogDeliveryEnabled"],
   ]
 }
}

resource "aws_msk_cluster" "example" {
 cluster_name           = "example"
 kafka_version          = "3.2.0"
 number_of_broker_nodes = 3

 broker_node_group_info {
   instance_type = "kafka.m5.large"
   client_subnets = [
     aws_subnet.subnet_az1.id,
     aws_subnet.subnet_az2.id,
     aws_subnet.subnet_az3.id,
   ]
   storage_info {
     ebs_storage_info {
       volume_size = 1000
     }
   }
   security_groups = [aws_security_group.sg.id]
 }

 encryption_info {
   encryption_at_rest_kms_key_arn = aws_kms_key.kms.arn
 }

 open_monitoring {
   prometheus {
     jmx_exporter {
       enabled_in_broker = true
     }
     node_exporter {
       enabled_in_broker = true
     }
   }
 }

 logging_info {
   broker_logs {
     cloudwatch_logs {
       enabled   = true
       log_group = aws_cloudwatch_log_group.test.name
     }
     firehose {
       enabled         = true
       delivery_stream = aws_kinesis_firehose_delivery_stream.test_stream.name
     }
     s3 {
       enabled = true
       bucket  = aws_s3_bucket.bucket.id
       prefix  = "logs/msk-"
     }
   }
 }

 tags = {
   foo = "bar"
 }
}

output "zookeeper_connect_string" {
 value = aws_msk_cluster.example.zookeeper_connect_string
}

output "bootstrap_brokers_tls" {
 description = "TLS connection host:port pairs"
 value       = aws_msk_cluster.example.bootstrap_brokers_tls
}

Create
MSK
via CLI:
Parametres:

create-cluster
--broker-node-group-info <value>
[--client-authentication <value>]
--cluster-name <value>
[--configuration-info <value>]
[--encryption-info <value>]
[--enhanced-monitoring <value>]
[--open-monitoring <value>]
--kafka-version <value>
[--logging-info <value>]
--number-of-broker-nodes <value>
[--tags <value>]
[--cli-input-json | --cli-input-yaml]
[--generate-cli-skeleton <value>]
[--debug]
[--endpoint-url <value>]
[--no-verify-ssl]
[--no-paginate]
[--output <value>]
[--query <value>]
[--profile <value>]
[--region <value>]
[--version <value>]
[--color <value>]
[--no-sign-request]
[--ca-bundle <value>]
[--cli-read-timeout <value>]
[--cli-connect-timeout <value>]
[--cli-binary-format <value>]
[--no-cli-pager]
[--cli-auto-prompt]
[--no-cli-auto-prompt]

Example:

aws kafka create-cluster \
   --cluster-name "MessagingCluster" \
   --broker-node-group-info file://brokernodegroupinfo.json \
   --kafka-version "2.2.1" \
   --number-of-broker-nodes 3

Best Practices for
MSK

Categorized by Availability, Security & Compliance and Cost

Critical
Ensure MSK (Kafka) broker instances are not publicly accessible
Info
Ensure MSK (Kafka) cluster is not using an unsupported Kafka version (2.4.1)
Warning
Ensure MSK (Kafka) clusters have encryption in transit enabled between brokers within a cluster
Warning
Ensure MSK (Kafka) clusters have encryption in transit enabled between clients and brokers using TLS
Critical
Ensure default security groups are not in use by MSK
Explore all the rules our platform covers
All Resources