Kafka


Messaging System

A Messaging System is responsible for transferring data from one application to another. Distributed messaging is based on the concept of reliable message queuing. Messages are queued asynchronously between client applications and messaging system. Two types of messaging patterns are available − one is point to point and the other is publish-subscribe (pub-sub) messaging system.

Point to Point Messaging System

In a point-to-point system, messages are persisted in a queue. One or more consumers can consume the messages in the queue, but a particular message can be consumed by a maximum of one consumer only. Once a consumer reads a message in the queue, it disappears from that queue. The typical example of this system is an Order Processing System, where each order will be processed by one Order Processor.

Publish-Subscribe Messaging System

In the publish-subscribe system, messages are persisted in a topic. Unlike point-to-point system, consumers can subscribe to one or more topic and consume all the messages in that topic. In the Publish-Subscribe system, message producers are called publishers and message consumers are called subscribers. A real-life example is Newspaper, TV channels.

Kafka

Apache Kafka is a distributed publish-subscribe messaging system and a robust queue that can handle a high volume of data and enables you to pass messages from one end-point to another. Kafka is suitable for both offline and online message consumption. Kafka messages are persisted on the disk and replicated within the cluster to prevent data loss. Kafka is built on top of the ZooKeeper synchronization service. It integrates very well with Apache Storm and Spark for real-time streaming data analysis.

·       Kafka is the log based

·       Unique log sequence numbers

·       Stored data is persistent in nature

·       Shared between applications

Apache Kafka has broadly classified in to three categories
1.        Messaging
2.        Connect
3.        Stream Processors



Benefits

Following are a few benefits of Kafka −

·      Reliability − Kafka is distributed, partitioned, replicated and fault tolerance.

·      Scalability − Kafka messaging system scales easily without down time.

·      Durability − Kafka uses Distributed commit log which means messages persists on disk as fast as possible, hence it is durable..

·     Performance − Kafka has high throughput for both publishing and subscribing messages. It maintains stable performance even many TB of messages are stored.

·      Replay -reset the offset and reprocess

·      Extremely high throughput

·      Fault tolerance

·      High availability

·      Replication – because partitions are usually replicated across multiple nodes (servers)

·      Kafka is starting to replacing lambda architecture (stream processing, aggregations)

Challenges in Kafka

·      Requires a separate Zookeeper cluster

·      Requires meticulous planning to select partition count

·      Storage management overhead

·      Ordering is guaranteed in a partition but not in entire topic.

Kafka is very fast and guarantees zero downtime and zero data loss.
Use Cases
Kafka can be used in many Use Cases. Some of them are listed below −
·  Metrics − Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data.
·   Log Aggregation Solution − Kafka can be used across an organization to collect logs from multiple services and make them available in a standard format to multiple consumers.
·     Stream Processing − Popular frameworks such as Storm and Spark Streaming read data from a topic, processes it, and write processed data to a new topic where it becomes available for users and applications. Kafka’s strong durability is also very useful in the context of stream processing.

Cluster Architecture


Producer: Application that sends the messages.
Consumer: Application that receives the messages.
Message: Information that is sent from the producer to a consumer through Apache Kafka.
Connection: A connection is a TCP connection between your application and the Kafka broker.
Topic: A Topic is a category/feed name to which messages are stored and published. Messages are byte arrays that can store any object in any format. All Kafka messages are organized into topics.  Messages published to the cluster will stay in the cluster until a configurable retention period has passed by.
Topic partition: Kafka topics are divided into a number of partitions, which allows you to split data across multiple brokers. Each message in a partition is assigned and identified by its unique offset. A topic can also have multiple partition

Replicas A replica of a partition is a "backup" of a partition. Replicas never read or write data. They are used to prevent data loss.
Consumer Group: A consumer group includes the set of consumer processes that are subscribing to a specific topic.
Offset: The offset is a unique identifier of a record within a partition. It denotes the position of the consumer in the partition.
Node/Broker: A node is a single computer in the Apache Kafka cluster.
Cluster: A cluster is a group of nodes i.e., a group of computers.
ZooKeeper: ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka system or failure of the broker in the Kafka system.

Role of ZooKeeper

A critical dependency of Apache Kafka is Apache Zookeeper, which is a distributed configuration and synchronization service. Zookeeper serves as the coordination interface between the Kafka brokers and consumers. The Kafka servers share information via a Zookeeper cluster. Kafka stores basic metadata in Zookeeper such as information about topics, brokers, consumer offsets (queue readers) and so on.
Consumers and consumer groups: Consumers can read messages starting from a specific offset and are allowed to read from any offset point they choose. This allows consumers to join the cluster at any point in time.

Consumers can join a group called a consumer group. A consumer group includes the set of consumer processes that are subscribing to a specific topic. Each consumer in the group is assigned a set of partitions to consume from. They will receive messages from a different subset of the partitions in the topic. Kafka guarantees that a message is only read by a single consumer in the group. Once the message is read the offset number is updated.
Assigning group to consumer
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, “groupName”);

Geo-Replication
Kafka MirrorMaker provides geo-replication support for your clusters. With Mirror Maker, messages are replicated across multiple datacenters or cloud regions. You can use this in active/passive scenarios for backup and recovery; or in active/active scenarios to place data closer to your users, or support data locality requirements.
Multi-tenancy
We can deploy Kafka as a multi-tenant solution. Multi-tenancy is enabled by configuring which topics can produce or consume data. There is also operations support for quotas. Administrators can define and enforce quotas on requests to control the broker resources that are used by clients.

Sample Kafka Producer
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {
  
   public static void main(String[] args) throws Exception{

      String topicName = “Topic1”;     
      Properties props = new Properties();
     
      //Assign localhost id, it can any number of servers
      props.put("bootstrap.servers", “localhost:9092");
      props.put("acks", all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      props.put("key.serializer",
         "org.apache.kafka.common.serializa-tion.StringSerializer");
       props.put("value.serializer",
         "org.apache.kafka.common.serializa-tion.StringSerializer");
     
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
       for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName,
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Sample Kafka Consumer
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
 
public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      //Kafka consumer configuration settings
      String topicName = “Topic1”;
      Properties props = new Properties();
      //bootstrap servers can be 1, 2…n
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Kafka Commands
Start Zookeeper

/usr/bin/ zookeeper-server-start /etc/kafka/zookeeper.properties &

Start Kafka broker
/usr/bin/ kafka-server-start /etc/kafka/server.properties &

Creating a Kafka topic
/usr/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
--partitions 2 --topic topic-name

List of Topics
/usr/bin/kafka-topics.sh --list --zookeeper localhost:2181

Deleting a Topic
/usr/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Get Offsets for the topic
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic topic-name --time
-1 latest available message (the new message which is not consumed)
-2 to see the first available message.

Add Partitions
/usr/bin/kafka-topics --zookeeper localhost:2181 --alter --topic topic-name --partitions 4

Get offsets for the topics under one group
/usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group-name

Get offset and lag of a topic by group name
/usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group-name

Set the offset by offset number
Stop the consumers
Set the offset by offset number
create /tmp/consumer.properties with group.id=my-group and max.poll.records=1
/usr/bin/kafka-console-consumer --topic  topic-name --max-messages 0 --consumer.config 
/tmp/consumer.properties --partition 2 --bootstrap-server localhost:9092 –offset 7000
 
Set the offset by timestamp
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090
stop the consumers



Zookeeper Cluster Configuration
1.         Update zookeeper.properties with the all nodes of cluster
2.         Create myid file with zookeeper ID(1,2,3) under zookeeper data directory. In this case /apps/opt/zookeeper/data

zookeeper.properties (path: /etc/kafka):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/apps/opt/zookeeper/data
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=10
tickTime=2000
initLimit=10
maxSessionTimeout=8000
syncLimit=2
server.1=host1:2888:3888
server.2=host2:2888:3888
server.3=host3:2888:3888

Kafka Broker Setup
Update server.properties and log4j.properties (comment console appender) as mentioned below.  server.properties file is big, I am putting only important element, other data will be default values.
server.properties
# The id of the broker. This must be set to a unique integer for each broker.
# broker.id=1 (1, 2, 3….)
broker.id=1
listeners=PLAINTEXT://<hostname>:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/apps/opt/kafka/data
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# To enable delete partition we need to set the below property
delete.topic.enable=false
auto.create.topics.enable=false
###################### Increase the message size ####################
#added to increase the message size Sathish
#This has to be smaller than the consumer fetch.message.max.bytes, default is 1000000
message.max.bytes=5000000
replica.fetch.max.bytes=5000001
#This has to be larger than message.max.bytes
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
####################### Zookeeper #######################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=host1:2181, host2:2181, host3:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


Kafka Connect
Apache Kafka Connect for streaming data between Apache Kafka and other systems, connect makes it very simple to quickly define Kafka connectors that move large collections of data into and out of Kafka.
Kafka connect can be used with Kafka messaging. For ex source can be application and consumer can be sink (database) and vice versa.


Kafka Connect Features
1. A common framework for Kafka connectors
It standardizes integration of other data systems with Kafka. Also, simplifies connector development, deployment, and management.
2. Distributed and standalone modes
Scale up to a large, centrally managed service supporting an entire organization or scale down to development, testing, and small production deployments.
3. REST interface
By an easy to use REST API, we can submit and manage connectors to our Kafka Connect cluster.
4. Automatic offset management
However, Kafka Connect can manage the offset commit process automatically even with just a little information from connectors. Hence, connector developers do not need to worry about this error-prone part of connector development.
5. Distributed and scalable by default
It builds upon the existing group management protocol. And to scale up a Kafka Connect cluster we can add more workers.
6. Streaming/batch integration
We can say for bridging streaming and batch data systems Kafka Connect is an ideal solution.

Kafka connect is event based, when insert/update/delete happened to a record the event will be triggered.

Kafka Connect includes two types of connectors:

  • Source connector – Ingests entire tables/databases and streams table updates to Kafka topics. A source connector can also collect metrics from all your application servers and store these in Kafka topics, making the data available for stream processing with low latency.
  • Sink connector – Delivers data from Kafka topics into secondary indexes such as Cassandra, Elasticsearch, or batch systems such as Hadoop for offline analysis. 

Confluent Schema Registry

Although Schema Registry is not a required service for Kafka Connect, it enables you to easily use Avro and JSON Schema as common data formats for the Kafka records that connectors read from and write to. This keeps the need to write custom code at a minimum and standardizes your data in a flexible format. You also get the added benefit of schema evolution and enforced compatibility rules. 

Standalone vs. Distributed Mode

Connectors and tasks are logical units of work and run as a process. This process is called a worker in Kafka Connect. There are two modes for running workers: standalone mode and distributed mode.

Standalone mode is useful for development and testing Kafka Connect on a local machine. It can also be used for environments that typically use single agents.

/usr/bin/connect-standalone worker.properties connector1.properties [connector2.propertie ...]

 The first parameter worker.properties is the worker configuration properties file. This will be present at /etc/schema-registry/connect-avro-standalone.properties.

The second parameter connector1.properties is the connector (source/sink) configuration properties file. All connectors have configuration properties that are loaded with the worker. As shown in the above example, you can launch multiple connectors using this command.

Distributed mode runs Connect workers on multiple machines (nodes). These form a Connect cluster. Kafka Connect distributes running connectors across the cluster. You can add more nodes or remove nodes as your needs evolve. Distributed mode is also more fault tolerant. If a node unexpectedly leaves the cluster, Kafka Connect automatically distributes the work of that node to other nodes in the cluster. And, because Kafka Connect stores connector

/usr/bin/connect-distributed worker.properties

 

The parameter worker.properties is the worker configuration properties file. This will be present at /etc/schema-registry/connect-avro- distributed.properties.

In standalone mode, connector configuration property files are added as command-line parameters. But in distributed mode, connectors are deployed and managed using a REST API request.


Connect Internal Topics

Connect stores connector and task configurations, offsets, and status in several Kafka topics. These are referred to as Kafka Connect internal topics.

Distributed Workers that are configured with matching group.id values automatically discover each other and form a Kafka Connect cluster. All Workers in the cluster use the same three internal topics to share connector configurations, offset data, and status updates. For this reason all distributed worker configurations in the same Connect cluster must have matching config.storage.topic, offset.storage.topic, and status.storage.topic properties.

 

Avro

To use the AvroConverter with Schema Registry, you specify the key.converter and value.converter properties in the worker configuration. An additional converter property must also be added that provides the Schema Registry URL

 

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

 

Worker property (Distributed Mode)

#connect-avro-distributed.properties file

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.

bootstrap.servers=localhost:9092

# The group ID is a unique identifier for the set of workers that form a single Kafka Connect cluster

group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data.

# Every Connect user will need to configure these based on the format they want their data in

# when loaded from or stored into Kafka

key.converter=io.confluent.connect.avro.AvroConverter

key.converter.schema.registry.url=http://<Server IP/hostname>:8081

value.converter=io.confluent.connect.avro.AvroConverter

value.converter.schema.registry.url=http://<Server IP/hostname>:8081

 # The following properties set the names of these three internal topics for storing configs, offsets, and status.

config.storage.topic=connect-configs

offset.storage.topic=connect-offsets

status.storage.topic=connect-statuses

 # The following properties set the replication factor for the three internal topics, defaulting to 3 for each

# and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with

# only a single broker, we set the replication factor here to just 1. That's okay for the examples, but

# ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of

# losing connector offsets, configurations, and status.

config.storage.replication.factor=3

offset.storage.replication.factor=3

status.storage.replication.factor=3

 # The offsets, status, and configurations are written to the topics using converters specified through

# the following required properties. Most users will always want to use the JSON converter without schemas.

# Offset and config data is never visible outside of Connect in this format.

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

 # These are provided to inform the user about the presence of the REST host and port configs

# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.

rest.host.name=<Server IP/hostname>

rest.port=8083

 # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

plugin.path=/usr/share/java

 

Connect REST Interface

Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. When executed in distributed mode, the REST API will be the primary interface to the cluster. You can make requests to any cluster member.

 

Connectors

 GET /connectors

Get a list of active connectors

 

Response JSON Object:

Connectors (array) – List of connector names

Example request:

GET /connectors

Host: connect.example.com

Accept: application/json

 Example response:

HTTP/1.1 200 OK

Content-Type: application/json

 ["testOracle-distributed-Jdbc-Source ", " testOracle-distributed-Jdbc-Sink"]

 

POST /connectors

Create a new connector, returning the current connector info if successful. Return 409 (Conflict) if rebalance is in process.

Request JSON Object:

·      name (string) – Name of the connector to create

·      config (map) – Configuration parameters for the connector. All values should be strings.

Response JSON Object:

·       name (string) – Name of the created connector

·       config (map) – Configuration parameters for the connector.

·       tasks (array) – List of active tasks generated by the connector

·       tasks[i].connector (string) – The name of the connector the task belongs to

·       tasks[i].task (int) – Task ID within the connector.

 

Example request:

JDBC-Source

POST /connectors HTTP/1.1

Host: connect.example.com

Content-Type: application/json

Accept: application/json

{
  "name": "testOracle-distributed-Jdbc-Source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "name": "testOracle-distributed-Jdbc-Source",
    "dialect.name": "OracleDatabaseDialect",
    "connection.url": "jdbc:oracle:thin:@<hostName/IP>:1521/<schemaName>",
    "connection.user": "*********",
    "connection.password": "******",
    "tasks.max": "10",
    "topic.prefix": "ORACLE-",
    "table.whitelist": "KAFKA_CONN_SOURCE",
    "mode": "timestamp+incrementing",
    "timestamp.column.name":"MODIFIED",
    "incrementing.column.name": "ID",
    "numeric.mapping": "best_fit",
    "value.converter.schema.registry.url": "http:// <hostName/IP>:8081",    
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http:// <hostName/IP>:8081"
  }
}


JDBC-Sink

POST /connectors HTTP/1.1

Host: connect.example.com

Content-Type: application/json

Accept: application/json

{
  "name": "testOracle-distributed-Jdbc-Sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "name": "testOracle-distributed-Jdbc-Sink",
    "connection.url": "jdbc:oracle:thin:@ <hostName/IP>:1521/<schemaName>",
    "connection.user": "*******",
    "connection.password": "***********",
    "dialect.name": "OracleDatabaseDialect",
    "table.name.format": "KAFKA_CONN_SINK",
    "tasks.max": "1",
    "topics": "ORACLE-KAFKA_CONN_SOURCE",
    "insert.mode": "upsert",
    "pk.mode": "record_value",
    "pk.fields": "ID",
    "auto.create": "true",
    "auto.evolve": "true",
    "numeric.mapping": "best_fit",    
    "transforms": "DropField,RenameField",
    "transforms.DropField.blacklist": "LNAME", 
    "transforms.RenameField.renames": "FNAME:FULL_NAME",   
    "transforms.RenameField.type":          "org.apache.kafka.connect.transforms.ReplaceField$Value",    
    "transforms.DropField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "value.converter.schema.registry.url": "http:// <hostName/IP>:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http:// <hostName/IP>:8081"
  }
}

 

Different modes for jdbc source connectors:

incrementing: use a strictly incrementing column on each table to detect only new rows. Note that this will not detect modifications or deletions of existing rows.

mode=incrementing

incrementing.column.name=TBL_ID

bulk: perform a bulk load of the entire table each time it is polled.

mode=bulk

timestamp: use a timestamp (or timestamp-like) column to detect new and modified rows. This assumes the column is updated with each write, and that values are monotonically incrementing, but not necessarily unique.

mode=timestamp

timestamp.column.name=custom_timestamp

timestamp+incrementing: use two columns, a timestamp column that detects new and modified rows and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset.

mode=timestamp+incrementing

timestamp.column.name=custom_timestamp

incrementing.column.name=TBL_ID

 

Different mode for jdbc sink connectors

Insert: Use standard SQL INSERT statements.

Upsert: Use the appropriate upsert semantics for the target database if it is supported by the connector, e.g. INSERT/UPDATE OR IGNORE.

Update:Use the appropriate update semantics for the target database if it is supported by the connector, e.g. UPDATE.

 

GET /connectors/<Name of the connector>

Get information about the connector.

Response JSON Object:

·       name (string) – Name of the created connector

·       config (map) – Configuration parameters for the connector.

·       tasks (array) – List of active tasks generated by the connector

·       tasks[i].connector (string) – The name of the connector the task belongs  to

·       tasks[i].task (int) – Task ID within the connector.

 

GET /connectors/<name of the connector>/config

Get the configuration for the connector.

Response JSON Object:

·       config (map) – Configuration parameters for the connector.

PUT /connectors/<Name of the connector>/config

Create a new connector using the given configuration, or update the configuration for an existing connector. Returns information about the connector after the change has been made. Return 409 (Conflict) if rebalance is in process.

Request JSON Object:

·       config (map) – Configuration parameters for the connector. All values should be strings.

Response JSON Object:

·       name (string) – Name of the created connector

·       config (map) – Configuration parameters for the connector.

·       tasks (array) – List of active tasks generated by the connector

·       tasks[i].connector (string) – The name of the connector the task belongs to

·       tasks[i].task (int) – Task ID within the connector.

 

Example request:

PUT /connectors/hdfs-sink-connector/configHTTP/1.1

Host: connect.example.com

Accept: application/json

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "name": "testOracle-distributed-Jdbc-Source",
    "dialect.name": "OracleDatabaseDialect",
    "connection.url": "jdbc:oracle:thin:@ <hostName/IP>::1521/pnpdev",
    "connection.user": "********",
    "connection.password": "**************",
    "tasks.max": "10",
    "topic.prefix": "ORACLE-",
    "table.whitelist": "KAFKA_CONN_SOURCE",
    "mode": "timestamp+incrementing",
    "timestamp.column.name":"MODIFIED",
    "incrementing.column.name": "ID",
    "numeric.mapping": "best_fit",
    "value.converter.schema.registry.url": "http:// <hostName/IP>::8081",    
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http:// <hostName/IP>::8081"
  }

 

GET /connectors/<Name of the connector>/status

Get current status of the connector, including whether it is running, failed or paused, which worker it is assigned to, error information if it has failed, and the state of all its tasks.

POST /connectors/<Name of the connector>/restart

Restart the connector. Return 409 (Conflict) if rebalance is in process.

PUT /connectors/<Name of the connector>/pause

Pause the connector and its tasks, which stops message processing until the connector is resumed. This call asynchronous and the tasks will not transition to PAUSED state at the same time. 

PUT /connectors/<Name of the connector>/resume

Resume a paused connector or do nothing if the connector is not paused. This call asynchronous and the tasks will not transition to RUNNING state at the same time.

DELETE /connectors/<Name of the connector>/

Delete a connector, halting all tasks and deleting its configuration. Return 409 (Conflict) if rebalance is in process.

 

Tasks

GET /connectors/<Name of the connector>/tasks

Get a list of tasks currently running for the connector.

GET /connectors<Name of the connector>/tasks/<taskId>/status

Get a task’s status.

POST /connectors/<Name of the connector>/tasks/<taskId/restart

Restart an individual task.

 

Connector Plugins

GET /connector-plugins/

Return a list of connector plugins installed in the Kafka Connect cluster.

 

Some of the useful connect commands on Unix

To Start the schema registry

/usr/bin/schema-registry-start  /etc/schema-registry/schema-registry.properties &

To start the connect worker in distributed mode

/usr/bin/connect-distributed /etc/schema-registry/connect-avro-distributed.properties &

 

Some of the curl commands 

curl <hostName/IP>:8083/connectors | jq

curl <hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Source | jq

curl <hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Sink | jq

curl <hostName/IP>:8083/connector-plugins | jq

curl -X POST <hostName/IP>:8083/connectors/s3-connector/restart

curl <hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Source/tasks | jq

curl -X POST <hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Source/tasks/0/restart

curl -X POST <hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Sink/tasks/0/restart

curl <hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Source/status | jq

curl <hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Sink/status | jq

curl -X DELETE <hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Source

curl -X DELETE <hostName/IP>:8083/connectors/testOracle-distributed-Jdbc-Sink

 

https://docs.confluent.io/platform/current/connect/userguide.html#connect-userguide


Kafka Streams
KafkaStreams enables us to consume from Kafka topics, analyze or transform data, and potentially, send it to another Kafka topic.

Kafka Streams API allows you to create real-time applications that power your core business. It is the easiest yet the most powerful technology to process data stored in Kafka. It builds upon important concepts for stream processing such as efficient management of application state, fast and efficient aggregations and joins, properly distinguishing between event-time and processing-time, and seamless handling of late-arriving and out-of-order data.


References:

1 comment:

  1. Genexdb provide fully managed Kafka clusters for your event-driven architecture. we provide 24/7 Kafka operations support for both managed and self-owned Kafka installations. As a distributed system for collecting, storing, and processing data at scale, Apache Kafka comes with its own deployment complexities.
    https://genexdbs.com/

    ReplyDelete