Simple stream processing with bash

Posted on 2017-02-17

A common issue when managing a larger number of hosts is knowing whether they are online or not. A couple of weeks ago I got a little time on my hands and felt like solving that problem.

The idea is to ping every possible host every so often, parse the response and finally publish a JSON object to kafka for future processing.

We split our problem into four distinct parts and tackle them one at a time:

  1. Ping all hosts continuously with fping
  2. Parse the output from fping with bash's regular expressions
  3. Create a JSON message with jq
  4. Send the finished JSON object to kafka with kafkacat

In essence what we are doing is performing simple stream processing. The source of our stream is fping. The data is then transformed with jq into JSON and finally the sent to our sink, kafka.

Ping hosts

There are many easy ways of sending ICMP echo requests to all hosts in a network, such as forking ping a bunch of times or using paralell. However that doesn't scale particularly well and is difficult to parse. Therefore we use a tool called fping which gives us the ability to ping entire subnets very easily and efficiently.

fping -A -e -l -b12 -p 60 -g

An explanation of the arguments, these can obviously be found in the manpage.

Argument Explanation
-A Display IP address instead of DNS name
-e Show round trip time, we want to know how responsive the host is
-l Continue forever
-b Message size, the minimal size is 12 bytes
-p Time between ICMP echo requests
-g The netmask to ping

Handling fping output

Reading the output

We do not care about the statistics fping prints, as they are printed to stderr we can easily ignoring them by piping stderr to /dev/null with 2>/dev/null.

Each ping reply is printed to its own line, we thus use the bash builtin read to read one line, for example:

(echo first line && echo second line) | while read LINE; do
    echo "$LINE";

Parsing the output

As bash have native support for regular expressions we use a regex to parse the fping output.

We define an IP address as anything matching 0-9a-fA-F.:. 0-9. matches IPv4 addresses only, while the extra characters a-fA-F: are used to match IPv6 addresses as well

An example of what we want to parse:   : [0], 40 bytes, 0.38 ms (0.38 avg, 0% loss)
Field Regex
IP Address
([0-9a-fA-F.:]+) +: \[(.*)\], (.*) bytes, (.*) ms \((.*) avg, (.*)% loss\)
Sequence number
([0-9a-fA-F.:]+) +: \[(.*)\], (.*) bytes, (.*) ms \((.*) avg, (.*)% loss\)
Message size
([0-9a-fA-F.:]+) +: \[(.*)\], (.*) bytes, (.*) ms \((.*) avg, (.*)% loss\)
Round trip time
([0-9a-fA-F.:]+) +: \[(.*)\], (.*) bytes, (.*) ms \((.*) avg, (.*)% loss\)
Avarage response time
([0-9a-fA-F.:]+) +: \[(.*)\], (.*) bytes, (.*) ms \((.*) avg, (.*)% loss\)
Ratio of lost packages
([0-9a-fA-F.:]+) +: \[(.*)\], (.*) bytes, (.*) ms \((.*) avg, (.*)% loss\)

The final regex ended up looking like this:

([0-9a-fA-F.:]+) +: \[(.*)\], (.*) bytes, (.*) ms \((.*) avg, (.*)% loss\)

Formatting data before sending

jq is a very powerful tool if you wish to either parse or generate JSON.

The final JSON object which we send to kafka should contain four fields, ip, size, roundtrip and time. For readability's sake I have added newlines and indentations to the example below, however we will skip that in the final object.

    "ip": "",
    "size": "40",
    "roundtrip": 0.38,
    "time": "2015-06-23T22:42:14,033793204+0200"

Since everything in bash is strings we need to transform the roundtrip and size to numbers, we can do this with the jq function tonumber. Our final jq filter will thus look like this:

{ "ip": $ip, "size": $size|tonumber, "roundtrip": $roundtrip|tonumber, "time": $now }

We tell jq to only generate with --null-input and the -output flags lets us define a nice format for sending to kafka. Moreover we can provide arguments to jq with the --arg argument, arguments in jq behave much like arguments in bash and will be substituted for their value.

jq \
    --null-input \
    --compact-output --ascii-output --monochrome-output \
    --arg ip "" \
    --arg size "40" \
    --arg roundtrip "0.38" \
    --arg now "2015-06-23T22:42:14,033793204+0200" \
    '{ "ip": $ip, "size": $size|tonumber, "roundtrip": $roundtrip|tonumber, "time": $now }'


Finally we want to send the output to kafka, this is trivial by simply piping the result to kafkacat. kafkacat will interpret each line as a separate message and send it to the topic specified.

The finished script

Putting everything together we get something similar to this:



fping -A -e -l -b12 -p "$WAIT_TIMEOUT" -g "$TARGET_NETWORK" 2>/dev/null \
    | while read -r PING_RESPONSE; do

    regex="([0-9a-fA-F.:]+) +: \[(.*)\], (.*) bytes, (.*) ms \((.*) avg, (.*)% loss\)"
    [[ $PING_RESPONSE =~ $regex ]]

    NOW=$(date --iso-8601=ns)

    jq \
        --null-input \
        --compact-output --ascii-output --monochrome-output \
        --arg ip "$IP" \
        --arg size "$SIZE" \
        --arg roundtrip "$ROUNDTRIP" \
        --arg now "$NOW" \
        '{ "ip": $ip, "size": $size|tonumber, "roundtrip": $roundtrip|tonumber, "time": $now }'

done | kafkacat -P -b "$BROKERS" -t event.JSON.ICMP_response


I started writing this post a while ago but never published. Since then we have used this script at work in production to keep track of our mobile connected devices (which is the reason for the small packet size). It has been running for a long while and has worked flawlessly and without interruption ever since we first deployed it demonstrating the power of combining simple tools.