Description:
Connect to Kafka server.
Syntax:
kafka_open(filename or fileObject; topic,...; partitionSize)
Note:
The external library function connects to Kafka server, and stores attribute parameter properties in the configuration file with extension .properties. It supports single-machine pattern and cluster pattern; it is recommended that the machine holding the leader is used when you are trying to connect to the server through a cluster.
Parameters:
topic |
Query one or more specified topics |
filename |
The attribute parameter file with the extension .properties, which contains the send key, value encoding and message decoding; all of them should correspond to each other one by one |
fileObject |
A file object |
partitionSize |
The number of zones on a topic, which can be absent and is valid only in the context of a cluster |
Options:
@c |
For the use of clustering and related operations |
Return value:
The consumer object
Example:
|
A |
|
1 |
=kafka_open("D://kafka.properties";"topic-test") |
Connect topic-test’s configuration file kafka.properties to Kafka server using a topic |
2 |
=kafka_open@c(file("D://kafka.properties");"topic-test";3) |
Connect topic-test’s configuration file kafka.properties to Kafka server using a file object |
3 |
=kafka_close(A1) |
|
Below is the content of D://kafka.properties:
##produce
bootstrap.servers=192.168.0.1:9092
producer.type=sync
request.required.acks=1
serializer.class=kafka.serializer.DefaultEncoder
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
##consume
group.id=test
zookeeper.session.timeout.ms=1000
zookeeper.sync.time.ms=200
auto.commit.interval.ms=500
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Explanations of user-defined coding and
decoding:
Put the jar containing encoding and decoding information in
exlib\KafkaCil directory, as shown below:
In raq-kafa-cil-2.11.jar, the encoding file is EncodeingSequence.class,
the decoding file is DecodeingSequence.class, and their corresponding
record object is Sequence.
Below are configurations of user-defined encoding and decoding in .properties
file:
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.raqsoft.lib.kafka.EncodeingSequence
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=com.raqsoft.lib.kafka.DecodeingSequence