frank
November 26, 2022, 12:32pm
1
I want to send some modules’ log to kafka with different topic, it does’t work
and I don’t see any errors output
4 redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG);
5 redef Kafka::topic_name = “”;
6 #redef Kafka::send_all_active_logs = T;
7 #redef Kafka::tag_json = T;
8
9 redef Kafka::kafka_conf = table(
10 [“metadata.broker.list”] = “192.168.31.138:9092”
11 );
33 event zeek_init() &priority=-10
34 {
35 for (stream_id in Log::active_streams)
36 {
37 if (send_to_kafka(stream_id))
38 {
39 local filter: Log::Filter = [
40 $name = fmt(“kafka-%s”, stream_id),
41 $writer = Log::WRITER_KAFKAWRITER,
42 $config = table(
43 [“metadata.broker.list”] = “192.168.31.138:9092”
44 ),
45 $path = fmt(“zeek_%s”, stream_id)
46 ];
47 print “----”,fmt(“zeek-%s”, stream_id);
48 Log::add_filter(stream_id, filter);
49 }
50 }
51 }
awelzel
November 28, 2022, 5:21pm
2
Without having tried, could you attempt to use &priority=-20
on your zeek_init()
handler? The zeek_init()
handler of the zeek-kafka plugin runs at priority=-10
as well and my replace the filters you created. Using a lower priority guarantees your zeek_init()
implementations runs later.
Hope this helps,
Arne
frank
November 29, 2022, 12:51am
3
5 redef Kafka::topic_name = “”;
6 redef Kafka::send_all_active_logs = T;
7 #redef Kafka::tag_json = T;
8
9 redef Kafka::kafka_conf = table(
10 [“metadata.broker.list”] = “192.168.31.138:9092”
11 );
33 event zeek_init() &priority=-10
34 {
35 for (stream_id in Log::active_streams)
36 {
37 if (send_to_kafka(stream_id))
38 {
39 local topicName:string = “zeek”;
40 local streamName:string = fmt(“%s”, stream_id);
41 local s_t = split_string(streamName, /::/);
42 for( index_st in s_t)
43 {
44 topicName += “_”;
45 topicName +=s_t[index_st];
46 }
47 #print “–topic name—”, topicName, "stram id ", streamName;
48 local filter: Log::Filter = [
49 $name = fmt(“kafka-%s”, stream_id),
50 $writer = Log::WRITER_KAFKAWRITER,
51 $config = table(
52 [“metadata.broker.list”] = “192.168.31.138:9092”
53 ),
54 $path = topicName
55 ];
56 Log::add_filter(stream_id, filter);
57 }
58 }
}
this can works,
seems Kafka::kafka_conf must be configured.
and topicName must be a string without “::”, stream_id is always contains “::”