Messaging
System
A Messaging System is responsible for
transferring data from one application to another. Distributed messaging is
based on the concept of reliable message queuing. Messages are queued
asynchronously between client applications and messaging system. Two types of
messaging patterns are available − one is point to point and the other is
publish-subscribe (pub-sub) messaging system.
Point to Point Messaging System
In a point-to-point system, messages are
persisted in a queue. One or more consumers can consume the messages in the
queue, but a particular message can be consumed by a maximum of one consumer
only. Once a consumer reads a message in the queue, it disappears from that
queue. The typical example of this system is an Order Processing System, where
each order will be processed by one Order Processor.
Publish-Subscribe Messaging System
In the publish-subscribe system, messages are
persisted in a topic. Unlike point-to-point system, consumers can subscribe to
one or more topic and consume all the messages in that topic. In the
Publish-Subscribe system, message producers are called publishers and message
consumers are called subscribers. A real-life example is Newspaper, TV
channels.
Kafka
Apache Kafka is a distributed publish-subscribe
messaging system and a robust queue that can handle a high volume of data and
enables you to pass messages from one end-point to another. Kafka is suitable
for both offline and online message consumption. Kafka messages are persisted
on the disk and replicated within the cluster to prevent data loss. Kafka is
built on top of the ZooKeeper synchronization service. It integrates very well
with Apache Storm and Spark for real-time streaming data analysis.
· Kafka is the log based
· Unique log sequence numbers
· Stored data is persistent in nature
· Shared between applications
Following are a few benefits of Kafka −
·
Reliability − Kafka is distributed, partitioned,
replicated and fault tolerance.
·
Scalability − Kafka messaging system scales easily
without down time.
·
Durability − Kafka uses Distributed commit
log which means messages persists on disk as fast as possible, hence it is
durable..
· Performance − Kafka has high throughput for both
publishing and subscribing messages. It maintains stable performance even many
TB of messages are stored.
·
Replay
-reset the offset and
reprocess
·
Extremely
high throughput
·
Fault
tolerance
·
High
availability
·
Replication
– because partitions are
usually replicated across multiple nodes (servers)
· Kafka is starting to replacing lambda architecture (stream processing, aggregations)
Challenges in Kafka
·
Requires a separate
Zookeeper cluster
·
Requires meticulous
planning to select partition count
·
Storage management
overhead
·
Ordering is guaranteed
in a partition but not in entire topic.
Cluster
Architecture
Role of ZooKeeper
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
//Kafka consumer configuration settings
String topicName = “Topic1”;
Properties props = new Properties();
//bootstrap servers can be 1, 2…n
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
/usr/bin/ zookeeper-server-start /etc/kafka/zookeeper.properties &
/usr/bin/ kafka-server-start /etc/kafka/server.properties &
/usr/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
--partitions 2 --topic topic-name
/usr/bin/kafka-topics.sh --list --zookeeper localhost:2181
/usr/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic topic-name --time
/usr/bin/kafka-topics --zookeeper localhost:2181 --alter --topic topic-name --partitions 4
/usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group-name
/usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group-name
Set the offset by offset number
create /tmp/consumer.properties with group.id=my-group and max.poll.records=1
/usr/bin/kafka-console-consumer --topic topic-name --max-messages 0 --consumer.config
/tmp/consumer.properties --partition 2 --bootstrap-server localhost:9092 –offset 7000
Set the offset by timestamp
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090
stop the consumers
It standardizes integration of other data systems with Kafka. Also, simplifies connector development, deployment, and management.
2. Distributed and standalone modes
Scale up to a large, centrally managed service supporting an entire organization or scale down to development, testing, and small production deployments.
3. REST interface
By an easy to use REST API, we can submit and manage connectors to our Kafka Connect cluster.
4. Automatic offset management
However, Kafka Connect can manage the offset commit process automatically even with just a little information from connectors. Hence, connector developers do not need to worry about this error-prone part of connector development.
5. Distributed and scalable by default
It builds upon the existing group management protocol. And to scale up a Kafka Connect cluster we can add more workers.
6. Streaming/batch integration
We can say for bridging streaming and batch data systems Kafka Connect is an ideal solution.
Kafka connect is event based, when
insert/update/delete happened to a record the event will be triggered.
Kafka Connect includes two
types of connectors:
- Source
connector
– Ingests entire tables/databases and streams table updates to Kafka
topics. A source connector can also collect metrics from all your
application servers and store these in Kafka topics, making the data
available for stream processing with low latency.
- Sink connector – Delivers data from Kafka topics into secondary indexes such as Cassandra, Elasticsearch, or batch systems such as Hadoop for offline analysis.
Confluent Schema Registry
Although Schema Registry is not a required service for Kafka Connect, it enables you to easily use Avro and JSON Schema as common data formats for the Kafka records that connectors read from and write to. This keeps the need to write custom code at a minimum and standardizes your data in a flexible format. You also get the added benefit of schema evolution and enforced compatibility rules.
Standalone vs. Distributed
Mode
Connectors and tasks are
logical units of work and run as a process. This process is called a worker
in Kafka Connect. There are two modes for running workers: standalone mode
and distributed mode.
Standalone mode is useful for development
and testing Kafka Connect on a local machine. It can also be used for
environments that typically use single agents.
/usr/bin/connect-standalone worker.properties connector1.properties [connector2.propertie ...]
The first parameter worker.properties is the worker configuration properties file. This will be present at /etc/schema-registry/connect-avro-standalone.properties.
The second parameter
connector1.properties is the connector (source/sink) configuration properties
file. All connectors have configuration properties that are loaded with the worker.
As shown in the above example, you can launch multiple connectors using this
command.
Distributed mode runs Connect workers on multiple machines (nodes). These form a Connect cluster. Kafka Connect distributes running connectors across the cluster. You can add more nodes or remove nodes as your needs evolve. Distributed mode is also more fault tolerant. If a node unexpectedly leaves the cluster, Kafka Connect automatically distributes the work of that node to other nodes in the cluster. And, because Kafka Connect stores connector
/usr/bin/connect-distributed worker.properties
The parameter
worker.properties is the worker configuration properties file. This will be present at
/etc/schema-registry/connect-avro- distributed.properties.
In standalone mode, connector configuration
property files are added as command-line parameters. But in distributed mode,
connectors are deployed and managed using a REST API request.
Connect Internal Topics
Connect stores connector and
task configurations, offsets, and status in several Kafka topics.
These are referred to as Kafka Connect internal topics.
Distributed Workers that are configured with matching group.id
values automatically discover each other and form a Kafka Connect cluster. All
Workers in the cluster use the same three internal topics to share connector
configurations, offset data, and status updates. For this reason all
distributed worker configurations in the same Connect cluster must have
matching config.storage.topic, offset.storage.topic, and status.storage.topic
properties.
Avro
To use the AvroConverter with Schema Registry, you specify
the key.converter and value.converter properties in the worker
configuration. An additional converter property must also be added that
provides the Schema Registry URL
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
Worker property (Distributed Mode)
#connect-avro-distributed.properties
file
# Bootstrap Kafka
servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092
# The group ID is a
unique identifier for the set of workers that form a single Kafka Connect
cluster
group.id=connect-cluster
# The converters specify
the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will
need to configure these based on the format they want their data in
# when loaded from or
stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://<Server
IP/hostname>:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://<Server
IP/hostname>:8081
# The following properties set the names of these three internal topics for storing configs, offsets, and status.
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
# The following properties set the replication factor for the three internal topics, defaulting to 3 for each
# and therefore
requiring a minimum of 3 brokers in the cluster. Since we want the examples to
run with
# only a single broker,
we set the replication factor here to just 1. That's okay for the examples, but
# ALWAYS use a
replication factor of AT LEAST 3 for production environments to reduce the risk
of
# losing connector offsets,
configurations, and status.
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# The offsets, status, and configurations are written to the topics using converters specified through
# the following required
properties. Most users will always want to use the JSON converter without
schemas.
# Offset and config data
is never visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port
for the REST API to listen on. If this is set, it will bind to the interface
used to listen to requests.
rest.host.name=<Server
IP/hostname>
rest.port=8083
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java
Connect REST Interface
Since Kafka Connect is
intended to be run as a service, it also supports a REST API for managing
connectors. When executed in distributed mode, the REST API will be the primary
interface to the cluster. You can make requests to any cluster member.
Connectors
GET /connectors
Get a list of active
connectors
Response JSON Object:
Connectors (array) – List of
connector names
Example request:
GET /connectors
Host: connect.example.com
Accept: application/json
Example response:
HTTP/1.1 200 OK
Content-Type: application/json
["testOracle-distributed-Jdbc-Source ", " testOracle-distributed-Jdbc-Sink"]
POST /connectors
Create a new connector,
returning the current connector info if successful. Return 409 (Conflict) if rebalance is in process.
Request JSON Object:
· name (string) – Name of the connector to create
· config (map) – Configuration parameters for the connector. All values should be strings.
Response JSON Object:
· name (string) – Name of the created connector
· config (map) – Configuration parameters for the connector.
· tasks (array) – List of active tasks generated by the connector
· tasks[i].connector (string) – The name of the connector the task belongs to
· tasks[i].task (int) – Task ID within the connector.
Example request:
JDBC-Source
POST /connectors HTTP/1.1
Host: connect.example.com
Content-Type: application/json
Accept: application/json
{
"name": "testOracle-distributed-Jdbc-Source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"name": "testOracle-distributed-Jdbc-Source",
"dialect.name": "OracleDatabaseDialect",
"connection.url": "jdbc:oracle:thin:@<hostName/IP>:1521/<schemaName>",
"connection.user": "*********",
"connection.password": "******",
"tasks.max": "10",
"topic.prefix": "ORACLE-",
"table.whitelist": "KAFKA_CONN_SOURCE",
"mode": "timestamp+incrementing",
"timestamp.column.name":"MODIFIED",
"incrementing.column.name": "ID",
"numeric.mapping": "best_fit",
"value.converter.schema.registry.url": "http:// <hostName/IP>:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http:// <hostName/IP>:8081"
}
}
JDBC-Sink
POST /connectors HTTP/1.1
Host: connect.example.com
Content-Type: application/json
Accept: application/json
{
"name": "testOracle-distributed-Jdbc-Sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"name": "testOracle-distributed-Jdbc-Sink",
"connection.url": "jdbc:oracle:thin:@ <hostName/IP>:1521/<schemaName>",
"connection.user": "*******",
"connection.password": "***********",
"dialect.name": "OracleDatabaseDialect",
"table.name.format": "KAFKA_CONN_SINK",
"tasks.max": "1",
"topics": "ORACLE-KAFKA_CONN_SOURCE",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "ID",
"auto.create": "true",
"auto.evolve": "true",
"numeric.mapping": "best_fit",
"transforms": "DropField,RenameField",
"transforms.DropField.blacklist": "LNAME",
"transforms.RenameField.renames": "FNAME:FULL_NAME",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.DropField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"value.converter.schema.registry.url": "http:// <hostName/IP>:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http:// <hostName/IP>:8081"
}
}
Different modes for jdbc source connectors:
incrementing: use a strictly
incrementing column on each table to detect only new rows. Note that this will
not detect modifications or deletions of existing rows.
mode=incrementing
incrementing.column.name=TBL_ID
bulk: perform a bulk load of the entire table each time it is polled.
mode=bulk
timestamp: use a timestamp (or timestamp-like) column to detect new and modified rows. This assumes the column is updated with each write, and that values are monotonically incrementing, but not necessarily unique.
mode=timestamp
timestamp.column.name=custom_timestamp
timestamp+incrementing: use two columns, a timestamp column that detects new and modified rows and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset.
mode=timestamp+incrementing
timestamp.column.name=custom_timestamp
incrementing.column.name=TBL_ID
Different mode for jdbc sink connectors
Insert: Use standard SQL INSERT
statements.
Upsert: Use the appropriate
upsert semantics for the target database if it is supported by the connector,
e.g. INSERT/UPDATE OR IGNORE.
Update:Use the appropriate
update semantics for the target database if it is supported by the connector,
e.g. UPDATE.
GET /connectors/<Name of the connector>
Get information about the connector.
Response JSON Object:
· name (string) – Name of the created connector
· config (map) – Configuration parameters for the connector.
· tasks (array) – List of active tasks generated by the connector
· tasks[i].connector (string) – The name of the connector the task belongs to
· tasks[i].task (int) – Task ID within the connector.
GET /connectors/<name of the connector>/config
Get the configuration for the
connector.
Response JSON Object:
·
config (map) – Configuration parameters for the connector.
PUT /connectors/<Name of the connector>/config
Create a new connector using the
given configuration, or update the configuration for an existing
connector. Returns information about the connector after the change has been
made. Return 409
(Conflict) if
rebalance is in process.
Request JSON Object:
·
config (map) – Configuration parameters for the connector. All
values should be strings.
Response JSON Object:
· name (string) – Name of the created connector
· config (map) – Configuration parameters for the connector.
· tasks (array) – List of active tasks generated by the connector
· tasks[i].connector (string) – The name of the connector the task belongs to
· tasks[i].task (int) – Task ID within the connector.
Example request:
PUT /connectors/hdfs-sink-connector/configHTTP/1.1
Host: connect.example.com
Accept: application/json
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"name": "testOracle-distributed-Jdbc-Source",
"dialect.name": "OracleDatabaseDialect",
"connection.url": "jdbc:oracle:thin:@ <hostName/IP>::1521/pnpdev",
"connection.user": "********",
"connection.password": "**************",
"tasks.max": "10",
"topic.prefix": "ORACLE-",
"table.whitelist": "KAFKA_CONN_SOURCE",
"mode": "timestamp+incrementing",
"timestamp.column.name":"MODIFIED",
"incrementing.column.name": "ID",
"numeric.mapping": "best_fit",
"value.converter.schema.registry.url": "http:// <hostName/IP>::8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http:// <hostName/IP>::8081"
}
GET /connectors/<Name of the connector>/status
Get current status of the
connector, including whether it is running, failed or paused, which worker it
is assigned to, error information if it has failed, and the state of all its
tasks.
POST /connectors/<Name of the connector>/restart
Restart the connector. Return
409
(Conflict) if
rebalance is in process.
PUT /connectors/<Name of the connector>/pause
Pause the connector and its tasks, which stops message processing until the connector is resumed. This call asynchronous and the tasks will not transition to PAUSED state at the same time.
PUT /connectors/<Name of the connector>/resume
Resume a paused connector or do nothing if the connector is not paused. This call asynchronous and the tasks will not transition to RUNNING state at the same time.
DELETE /connectors/<Name of the connector>/
Delete a connector, halting
all tasks and deleting its configuration. Return 409 (Conflict) if rebalance is in process.
Tasks
GET /connectors/<Name of the connector>/tasks
Get a list of tasks currently running for the connector.
GET /connectors<Name of the connector>/tasks/<taskId>/status
Get a task’s status.
POST /connectors/<Name of the connector>/tasks/<taskId/restart
Restart an individual task.
Connector Plugins
GET /connector-plugins/
Return a list of connector
plugins installed in the Kafka Connect cluster.
Some of the useful connect commands on Unix
To Start the schema registry
/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &
To start the connect worker in distributed mode
/usr/bin/connect-distributed /etc/schema-registry/connect-avro-distributed.properties &
Some of the curl commands
curl <hostName/IP>:8083/connectors | jq
curl
<hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Source | jq
curl
<hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Sink | jq
curl <hostName/IP>:8083/connector-plugins | jq
curl -X POST <hostName/IP>:8083/connectors/s3-connector/restart
curl
<hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Source/tasks |
jq
curl -X POST
<hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Source/tasks/0/restart
curl -X POST <hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Sink/tasks/0/restart
curl
<hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Source/status |
jq
curl <hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Sink/status
| jq
curl -X DELETE
<hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Source
curl -X DELETE
<hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Sink
https://docs.confluent.io/platform/current/connect/userguide.html#connect-userguide
Genexdb provide fully managed Kafka clusters for your event-driven architecture. we provide 24/7 Kafka operations support for both managed and self-owned Kafka installations. As a distributed system for collecting, storing, and processing data at scale, Apache Kafka comes with its own deployment complexities.
ReplyDeletehttps://genexdbs.com/