kafka_open()

Read(40) Label: connect, kafka server,

Description:

Connect to Kafka server.

Syntax:

kafka_open(filename or fileObject; topic,...; partitionSize)

Note:

The external library function connects to Kafka server, and stores attribute parameter properties in the configuration file with extension .properties. It supports single-machine pattern and cluster pattern; it is recommended that the machine holding the leader is used when you are trying to connect to the server through a cluster.

Parameters:

topic

Query one or more specified topics

filename

The attribute parameter file with the extension .properties, which contains the send key, value encoding and message decoding; all of them should correspond to each other one by one

fileObject

A file object

partitionSize

The number of zones on a topic, which can be absent and is valid only in the context of a cluster

Options:

@c

For the use of clustering and related operations

Return value:

The consumer object

Example:

 

A

 

1

=kafka_open("D://kafka.properties";"topic-test")

Connect topic-test’s configuration file kafka.properties to Kafka server using a topic

2

=kafka_open@c(file("D://kafka.properties");"topic-test";3)

Connect topic-test’s configuration file kafka.properties to Kafka server using a file object

3

=kafka_close(A1)

 

 

Below is the content of D://kafka.properties:

##produce 

bootstrap.servers=192.168.0.1:9092

producer.type=sync 

request.required.acks=1 

serializer.class=kafka.serializer.DefaultEncoder 

key.serializer=org.apache.kafka.common.serialization.StringSerializer 

value.serializer=org.apache.kafka.common.serialization.StringSerializer 

 

##consume 

group.id=test

zookeeper.session.timeout.ms=1000 

zookeeper.sync.time.ms=200 

auto.commit.interval.ms=500 

auto.offset.reset=earliest

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Explanations of user-defined coding and decoding:
Put the jar containing encoding and decoding information in exlib\KafkaCil directory, as shown below:

In raq-kafa-cil-2.11.jar, the encoding file is EncodeingSequence.class, the decoding file is DecodeingSequence.class, and their corresponding record object is Sequence.

Below are configurations of user-defined encoding and decoding in .properties file:
key.serializer=org.apache.kafka.common.serialization.StringSerializer 
value.serializer=com.raqsoft.lib.kafka.EncodeingSequence

 

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer  value.deserializer=com.raqsoft.lib.kafka.DecodeingSequence