Skip to main content
On this page

Perform chaos testing

Shield functionality

This feature is available with Conduktor Shield only.

Chaos testing Interceptors in Conduktor are like stress tests for your data systems.

Performing chaos testing allows you to see how well your applications can handle problems like slow data delivery, corrupted messages or unexpected duplicates. This helps ensure that your systems are strong and can keep running smoothly even when things go wrong.

Simulate broken brokers

This Interceptor injects intermittent errors in client connections to brokers that are consistent with broker side issues.

This only works on Produce and Fetch requests.

Configuration

keytypedefaultdescription
rateInPercentintThe percentage of requests that will result in a broker not available response
errorMapMap{"FETCH": "UNKNOWN_SERVER_ERROR", "PRODUCE": "CORRUPT_MESSAGE"}Map of ApiKeys and Error you want to response

Possible errors for API Key

FETCH

  • OFFSET_OUT_OF_RANGE
  • TOPIC_AUTHORIZATION_FAILED
  • REPLICA_NOT_AVAILABLE
  • NOT_LEADER_OR_FOLLOWER
  • FENCED_LEADER_EPOCH
  • UNKNOWN_LEADER_EPOCH
  • UNKNOWN_TOPIC_OR_PARTITION
  • KAFKA_STORAGE_ERROR
  • UNSUPPORTED_COMPRESSION_TYPE
  • CORRUPT_MESSAGE
  • UNKNOWN_TOPIC_ID
  • FETCH_SESSION_TOPIC_ID_ERROR,
  • INCONSISTENT_TOPIC_ID,
  • UNKNOWN_SERVER_ERROR

PRODUCE

  • CORRUPT_MESSAGE,
  • UNKNOWN_TOPIC_OR_PARTITION,
  • NOT_LEADER_OR_FOLLOWER,
  • INVALID_TOPIC_EXCEPTION,
  • RECORD_LIST_TOO_LARGE,
  • NOT_ENOUGH_REPLICAS,
  • NOT_ENOUGH_REPLICAS_AFTER_APPEND,
  • INVALID_REQUIRED_ACKS,
  • TOPIC_AUTHORIZATION_FAILED,
  • UNSUPPORTED_FOR_MESSAGE_FORMAT,
  • INVALID_PRODUCER_EPOCH,
  • CLUSTER_AUTHORIZATION_FAILED,
  • TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
  • INVALID_RECORD
Example
{
"name": "myBrokenBrokerChaosInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.SimulateBrokenBrokersPlugin",
"priority": 100,
"config": {
"rateInPercent": 100,
"errorMap": {
"FETCH": "UNKNOWN_SERVER_ERROR",
"PRODUCE": "CORRUPT_MESSAGE"
}
}
}

Simulate duplicate messages

Duplicate Messages will duplicate records when the client produces/consumes the records to/from kafka.

This interceptor is useful for testing applications to ensure that they behave appropriately when there are duplicate records received from Kafka.

Note: By default, duplicate messages causes chaos on fetch, therefore this plugin only duplicates the records returned to the client, the records on the broker are not duplicated

For example, you could have a message that says "Add £10 to a bank account, Unique Message Id is 12345".

That message is duplicated. The unique id is the same in both.

The client application needs to be validated to ensure that it only receives £10 once.

Configuration

keytypedefaultdescription
topicString.*Topics that match this regex will have the interceptor applied.
rateInPercentint100The percentage of records that will be duplicated.
targetenumCONSUMERecord is duplicated when the client produces or consumes the record, values can be PRODUCE or CONSUME
Example
{
"name": "myDuplicateRecordsInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.DuplicateMessagesPlugin",
"priority": 100,
"config": {
"topic": "client_topic_.*",
"rateInPercent": 100,
"target": "PRODUCE"
}
}

Simulate invalid schema ID

Simulate invalid schema id will overwrite an invalid schemaId value to the records.

Because schemaId is overwritten with an invalid value, the following error is returned when consuming records:

Processed a total of 1 messages
[2022-11-17 15:59:13,184] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Error retrieving JSON schema for id 999
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:182)
at io.confluent.kafka.formatter.json.JsonSchemaMessageFormatter$JsonSchemaMessageDeserializer.deserialize(JsonSchemaMessageFormatter.java:130)
at io.confluent.kafka.formatter.json.JsonSchemaMessageFormatter$JsonSchemaMessageDeserializer.deserialize(JsonSchemaMessageFormatter.java:103)
at io.confluent.kafka.formatter.json.JsonSchemaMessageFormatter.writeTo(JsonSchemaMessageFormatter.java:94)
at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:181)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:116)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema 999 not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:301)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:371)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:840)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:813)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:294)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:417)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:119)
... 8 more

See more about schema registry and schema-id here

Configuration

keytypedefaultdescription
topicString.*Topics that match this regex will have the interceptor applied
invalidSchemaIdintegerInvalid schema id, if not passed the value will be random
targetenumCONSUMESchemaId is overwritten with an invalid value in the record when the client produces or consumes the record, values can be PRODUCE or CONSUME
Example
{
"name": "mySimulateInvalidSchemaIdInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.SimulateInvalidSchemaIdPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"invalidSchemaId": 9999,
"target": "PRODUCE"
}
}

Simulate latency on all interactions

This interceptor adds latency to a percentage of requests and responses flowing between your Kafka applications and your Kafka cluster.

This interceptor is useful for testing applications to ensure that they behave appropriately when there are network delays talking to Kafka, or the Kafka broker is for some reason responding slowly.

Configuration

keytypedescription
appliedPercentageintThe percentage of requests flowing through the gateway that will have increased latency applied for them. For example, an applied percentage of 10 will add a latency of the value of latencyMs to 10% of requests and responses. The value must be between 0 and 10.
latencyMslongThe number of milliseconds to add to the request. The latency in milliseconds that will be applied to the requests and responses flowing through the gateway. The value must be between 0 and (don't mind... max int, or 10 seconds, or something else)?
Example
{
"name": "mySimulateLatencyInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.SimulateLatencyPlugin",
"priority": 100,
"config": {
"appliedPercentage": 100,
"latencyMs": 1000
}
}

Simulate leader election errors

This interceptor is useful for testing applications to ensure they can survive leader election that happen:

  • When the leader dies, and another one needs to take over
  • When we do rolling upgrades

By sending:

  • LEADER_NOT_AVAILABLE
  • NOT_LEADER_OR_FOLLOWER
  • BROKER_NOT_AVAILABLE

Configuration

keytypedescription
rateInPercentintThe percentage of requests that will result in a leader or broker not available response
Example
{
"name": "mySimulateLeaderElectionsErrorsPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.SimulateLeaderElectionsErrorsPlugin",
"priority": 100,
"config": {
"rateInPercent": 100
}
}

Simulate message corruption

From time to time, messages will arrive that are not in the expected format.

This interceptor adds a random bytes to the end of the data in records produced to Kafka.

Configuration

keytypedefaultdescription
topicString.*Regular expression that matches topics from your produce request.
sizeInBytesint10Number of random content bytes to append to the message data.
rateInPercentint100percentage of records that will have random bytes appended.

You can simulate corruption when:

  • sending data: io.conduktor.gateway.interceptor.chaos.ProduceSimulateMessageCorruptionPlugin
  • *reading data:io.conduktor.gateway.interceptor.chaos.FetchSimulateMessageCorruptionPlugin
Example
{
"name": "mySimulateMessageCorruptionInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.FetchSimulateMessageCorruptionPlugin",
"priority": 100,
"config": {
"topic": "client_topic_.*",
"sizeInBytes": 100,
"rateInPercent": 100
}
}

Simulate slow brokers

This Interceptor slows responses from the brokers. This only works on Produce and Fetch requests.

Configuration

keytypedescription
rateInPercentintThe percentage of requests that will have the interceptor applied
minLatencyMsintMinimum for the random response latency in milliseconds
maxLatencyMsintMaximum for the random response latency in milliseconds
Example
{
"name": "mySimulateSlowBrokerInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.SimulateSlowBrokerPlugin",
"priority": 100,
"config": {
"rateInPercent": 100,
"minLatencyMs": 50,
"maxLatencyMs": 1200
}
}

Simulate slow producers and consumers

This Interceptor slows responses from the brokers. It will operate only on a set of topics rather than all traffic.

This Interceptor only works on Produce and Fetch requests.

Configuration

keytypedefaultdescription
topicString.*Topics that match this regex will have the interceptor applied.
rateInPercentintThe percentage of requests that will apply this interceptor
minLatencyMsintMinimum for the random response latency in milliseconds
maxLatencyMsintMaximum for the random response latency in milliseconds
Example
{
"name": "mySimulateSlowProducersConsumersInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.SimulateSlowProducersConsumersPlugin",
"priority": 100,
"config": {
"rateInPercent": 100,
"minLatencyMs": 50,
"maxLatencyMs": 1200
}
}