Aggregate similar communications in Zeek script

Good Morning,

I have a Zeek script to log modbus trafic with specific conditions. I am looking to group similar communications by port, source ip, destination ip and the modbus function called.

Right now it looks like this :

export {
    redef enum Log::ID += { LOG_DETAILED};

    
    type Modbus_Detailed: record {
        ts_request                      : time              &log;             # Timestamp of event
        ts_last_response                     : time              &log &optional;
        min_response_time               : interval                  &log &optional;
        average_response_time           : interval                  &log &optional;
        max_response_time               : interval                  &log &optional;
        response_count                  : count             &log &optional;
        tid                     : count            &log;             # Zeek unique ID for connection
        id                      : string           &log;             # Zeek connection struct (addresses and ports_request)
        unit_id                 : count             &log;             # Modbus unit-id
        func                    : string            &log &optional;   # Modbus Function
        network_direction       : string            &log &optional;   # Message direction (request or response)
        address                 : count             &log &optional;   # Starting address for value(s) field
        quantity                : count             &log &optional;   # Number of addresses/values read or written to
        values                  : string            &log &optional;   # Coils, discrete_inputs_request, or registers read/written to
    };
    global log_modbus_detailed: event(rec: Modbus_Detailed);
}
global function_ids: table[string, string] of Modbus_Detailed;

event modbus_message(c: connection,
                     headers: ModbusHeaders,
                     is_orig: bool) &priority=-5 {

    if ([c$uid, c$modbus$func] !in function_ids){

        local modbus_detailed_rec: Modbus_Detailed;

        modbus_detailed_rec$ts_request = network_time();
        modbus_detailed_rec$tid = headers$tid;
        modbus_detailed_rec$id = c$uid;
        modbus_detailed_rec$unit_id = headers$uid;
        modbus_detailed_rec$func = c$modbus$func;
        modbus_detailed_rec$network_direction = request_str_g;
        modbus_detailed_rec$response_count = 0;

        function_ids[c$uid, c$modbus$func] = modbus_detailed_rec;
        
   }else{
        local response: Modbus_Detailed;
        response$ts_last_response = network_time();

        function_ids[c$uid, c$modbus$func]$ts_last_response = response$ts_last_response;
        function_ids[c$uid, c$modbus$func]$network_direction = response_str_g;

        local time_interval: interval = function_ids[c$uid,c$modbus$func]$ts_request - response$ts_last_response;
                
        if(time_interval > function_ids[c$uid,c$modbus$func]$max_response_time){
                function_ids[c$uid, c$modbus$func]$max_response_time = time_interval;
        }
        if(time_interval < function_ids[c$uid, c$modbus$func]$min_response_time){
                function_ids[c$uid, c$modbus$func]$min_response_time = time_interval;
        }

        function_ids[c$uid, c$modbus$func]$response_count += 1;

        if(function_ids[c$uid,c$modbus$func]$response_count > 2){
          Log::write(LOG_DETAILED, function_ids[c$uid, c$modbus$func]);
          delete function_ids[c$uid, c$modbus$func];
        }
   }
}

I tried to identify similar communications with the c$uid parameter because it represents one connection, and the c$modbus$func parameter. But it seems that I always end in the if statement of the modbus_message function and never reach the else statement.

My guess is either that the c$uid is in fact unique at each execution of the script but when looking at my logs, I have multiple occurences of the c$uid.

Or maybe the table function_ids is always empty when reaching the if statement, but I don’t know how I should fix it.

Thank you for your help

Hi there,

Your approach is generally correct. It really helps in such situations to see code that is runnable in isolation and without syntax errors. I made a pass over your script so you can do that, see below. if you run it with the modbus.trace file from the Zeek distribution, you should obtain a modbus_detailed.log to get you started, with lines like these:

1342776596.696964       1342776596.773953       3       19785   READ_HOLDING_REGISTERS
1342776596.774994       1342776596.805184       3       19790   READ_INPUT_REGISTERS
1342776596.806146       1342776596.836439       3       19792   READ_INPUT_REGISTERS
1342776596.837545       1342776597.867809       3       19794   READ_INPUT_REGISTERS
1342776597.884125       1342776597.914570       3       19797   READ_INPUT_REGISTERS

Note that this still has various problems. For example, you should always check the existence of an optional field before accessing it — something like if ( c$modbus?$func ) before doing c$modbus$func. Also, as-is the script will never expire state from the function_ids table if the response count doesn’t exceed 2, a typical state leak.

I think you’re basing this on the CISA icsnpp-modbus package, right? In that case there are also questions around whether you’d like your script to extend it, or instead of it. We’re happy to help once you get to that point.

Best,
Christian

module Modbus_Extended;

export {
        redef enum Log::ID += { LOG_DETAILED };

        type Modbus_Detailed: record {
                ts_request: time &log; # Timestamp of event
                ts_last_response: time &log &optional;
                response_count: count &log &optional;
                tid: count &log; # Zeek unique ID for connection
                func: string &log &optional; # Modbus Function
        };
}

global function_ids: table[string, string] of Modbus_Detailed;

event modbus_message(c: connection, headers: ModbusHeaders, is_orig: bool) &priority=-5
        {
        if ( [c$uid, c$modbus$func] !in function_ids )
                {
                local modbus_detailed_rec: Modbus_Detailed;

                modbus_detailed_rec$ts_request = network_time();
                modbus_detailed_rec$tid = headers$tid;
                modbus_detailed_rec$func = c$modbus$func;
                modbus_detailed_rec$response_count = 0;

                function_ids[c$uid, c$modbus$func] = modbus_detailed_rec;
                }
        else
                {
                local ts = network_time();

                function_ids[c$uid, c$modbus$func]$ts_last_response = ts;

                local time_interval = function_ids[c$uid, c$modbus$func]$ts_request - ts;

                function_ids[c$uid, c$modbus$func]$response_count += 1;

                if ( function_ids[c$uid, c$modbus$func]$response_count > 2 )
                        {
                        Log::write(LOG_DETAILED, function_ids[c$uid, c$modbus$func]);
                        delete function_ids[c$uid, c$modbus$func];
                        }
                }
        }

event zeek_init()
        {
        Log::create_stream(Modbus_Extended::LOG_DETAILED, [$columns=Modbus_Detailed, $path="modbus_detailed"]);
        }

Thank you for your answer, I am indeed using the icsnpp-modbus package and wish to extend its capabilities by adding log streams for my use cases. I already have logs that now look like this :

1670430058.145471       1670430060.977248       -0.005143       -       -       101     5539    130.65.1.168    1597    130.65.1.18     502     CGxS14leMXogphUf1       1       READ_HOLDING_REGISTERS
1670430058.129123       1670430061.374297       -0.010606       -       -       101     6155    130.65.1.168    1709    130.65.1.175    502     Cph1Qa46shwX9cFW1h      1       READ_HOLDING_REGISTERS
1670430058.128529       1670430061.478646       -0.006571       -       -       101     30450   130.65.1.168    1675    130.65.1.13     502     CKtQcj3zu0UH9kKULc      1       READ_HOLDING_REGISTERS    
1670430058.164589       1670430063.089129       -0.010487       -       -       101     34093   130.65.1.168    1564    130.65.1.173    502     CMkCwC3PHzrMeBwCA7      1       READ_HOLDING_REGISTERS

I am able to aggregate communications by response_count, but I am unable to store time_interval in function_ids[c$uid, c$modbus$func]$min_response_time.

I want to compare the previous value of function_ids[c$uid, c$modbus$func]$min_response_time and time_interval, and store it if the condition is met like this :

  response$ts_last_response = current_t;
                local time_interval = transaction_ids[c$id, headers$tid]$ts_request - response$ts_last_response;

                if(function_ids[c$uid, headers$function_code]$min_response_time > time_interval){
                        function_ids[c$uid, headers$function_code]$min_response_time = time_interval;
                }

I don’t get any error but the min_response_time field is always empty. My guess is that I have to check if the field is initialized or not. But I am not able to find any function to do so since (!function_ids[c$uid, c$modbus$func]$min_response_time) is not a valid operand.

How can I check if the field min_response_time (an interval) is unset ?

You can check access to an optional field via the ?$ operator, see docs here.

You need a check somewhere to see whether $min_response_time has been set, and if not, set it to the just computed interval. In subsequent updates you can then shrink it whenever there’s a smaller interval. And similarly for $max_response_time. So something like this:

response$ts_last_response = current_t;
local time_interval = current_t - function_ids[c$id, headers$tid]$ts_request;

if ( ! function_ids[c$uid, headers$function_code]?$min_response_time
     || function_ids[c$uid, headers$function_code]$min_response_time > time_interval ) {
    function_ids[c$uid, headers$function_code]$min_response_time = time_interval;
}

Let us know how it goes.

Best,
Christian

1 Like

Thank you, that was the operator I was looking for :slightly_smiling_face: