Send logs to kafka with different topic using zeek-kafka plugin

I want to send some modules’ log to kafka with different topic, it does’t work
and I don’t see any errors output

4 redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG);
5 redef Kafka::topic_name = “”;
6 #redef Kafka::send_all_active_logs = T;
7 #redef Kafka::tag_json = T;
8
9 redef Kafka::kafka_conf = table(
10 [“metadata.broker.list”] = “192.168.31.138:9092”
11 );

33 event zeek_init() &priority=-10
34 {
35 for (stream_id in Log::active_streams)
36 {
37 if (send_to_kafka(stream_id))
38 {
39 local filter: Log::Filter = [
40 $name = fmt(“kafka-%s”, stream_id),
41 $writer = Log::WRITER_KAFKAWRITER,
42 $config = table(
43 [“metadata.broker.list”] = “192.168.31.138:9092”
44 ),
45 $path = fmt(“zeek_%s”, stream_id)
46 ];
47 print “----”,fmt(“zeek-%s”, stream_id);
48 Log::add_filter(stream_id, filter);
49 }
50 }
51 }

Without having tried, could you attempt to use &priority=-20 on your zeek_init() handler? The zeek_init() handler of the zeek-kafka plugin runs at priority=-10 as well and my replace the filters you created. Using a lower priority guarantees your zeek_init() implementations runs later.

Hope this helps,
Arne

5 redef Kafka::topic_name = “”;
6 redef Kafka::send_all_active_logs = T;
7 #redef Kafka::tag_json = T;
8
9 redef Kafka::kafka_conf = table(
10 [“metadata.broker.list”] = “192.168.31.138:9092”
11 );

33 event zeek_init() &priority=-10
34 {
35 for (stream_id in Log::active_streams)
36 {
37 if (send_to_kafka(stream_id))
38 {
39 local topicName:string = “zeek”;
40 local streamName:string = fmt(“%s”, stream_id);
41 local s_t = split_string(streamName, /::/);
42 for( index_st in s_t)
43 {
44 topicName += “_”;
45 topicName +=s_t[index_st];
46 }
47 #print “–topic name—”, topicName, "stram id ", streamName;
48 local filter: Log::Filter = [
49 $name = fmt(“kafka-%s”, stream_id),
50 $writer = Log::WRITER_KAFKAWRITER,
51 $config = table(
52 [“metadata.broker.list”] = “192.168.31.138:9092”
53 ),
54 $path = topicName
55 ];
56 Log::add_filter(stream_id, filter);
57 }
58 }
}

this can works,
seems Kafka::kafka_conf must be configured.
and topicName must be a string without “::”, stream_id is always contains “::”