Labels

Tuesday, August 11, 2020

How to produce and consume messages using Kafka Rest Proxy

The Kafka REST Proxy provides a RESTful interface for Apache Kafka clusters to consume and produce messages and to perform administrative operations. It allows you to Consume messages from topics or concrete topic partitions,Produce messages to topics or partitions.


1) Produce a message using JSON with the value '{ "foo": "bar" }' to the topic test

Command: curl -u mapr:mapr -k -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"foo":"bar"}}]}' "https://localhost:8082/topics/test1"

Output :
  {
   "offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null
  }


2) Create a consumer for JSON data, starting at the beginning of the topic's log and subscribe to a topic. Then consume some data using the base URL in the first response.

Command: curl -u mapr:mapr -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' https://localhost:8082/consumers/my_json_consumer

Output :
 {
  "instance_id":"my_consumer_instance",
  "base_uri":"http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance"
 }
 
 
3) Subscribe to topic  

Command: curl -u mapr:mapr -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["test1"]}' https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription

Output : No content in response


4) List the topics subscribed

Command: curl -u mapr:mapr -k -X GET -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["test1"]}' https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription

Output : {"topics":["/tmp/kafka:test"]}


5) Consume the records for the topic

Command: curl -u mapr:mapr -k -X GET -H "Accept: application/vnd.kafka.json.v2+json" https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

Output :
  [
   {"key":null,"value":{"foo":"bar"},"partition":0,"offset":0,"topic":"test1"}
  ]


5) Finally, close the consumer with a DELETE to make it leave the group and clean up its resources.

Command: curl -u mapr:mapr -k -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance

Output : No content in response


Additional Configurations :

1) By default consumer.instance.timeout.ms is amount of idle time (in milliseconds) before a consumer instance is automatically destroyed. Type: int. Default: 300000 (5 minutes)

No comments:

Post a Comment