Loading Spur JSONL data into OpenSearch

2024-06-30 22:54:13 +0200 +0200

I recently had the experience of working out how to import a daily feed of tens of millions lines of JSONL data into OpenSearch.

Background

Spur.us offers a data feed of IP address intelligence data. Each day, you can download a file with about ~30 million lines of data in JSONL format.

I wanted to be able to load this data such that:

On their site, Spur says:

Our feeds are designed to be easily ingested into most data-lake or cloud-native database solutions.

Turns out, they are right, it is very easily done.

Setup

I set up a new server and installed OpenSearch using the Debian install guide.

Then I set up these field mappings on the index:

{
  "mappings" : {
    "properties" :  {
      "ip" : {
        "type" : "ip"
      },
      "tunnels.exits" : {
        "type" : "ip"
      },
      "tunnels.entries" : {
        "type" : "ip"
      },
      "client.concentration.geohash": {
        "type": "geo_point"
      }

    }
  }
}

We’re not making use of the geohash field, but the other ones are important: they tell OpenSearch to treat the fields with the name ip as an IP address field type.

The import

I installed Benthos and created a config file at $HOME/config.yaml. The relevant parts are:

input:
  label: ""
  read_until:
    idle_timeout: 600s
    input:
      file:
        paths: [ ./feed/feed.json ]
        scanner:
          lines: {}
pipeline:
  threads: -1
  processors:
  - awk:
      codec: none
      program: |
        json_set("@timestamp", timestamp_format(timestamp_unix()));        
output:
  opensearch:
    urls: [ https://localhost:9200 ]
    index: "ipoid"
    tls:
      enabled: true
      skip_cert_verify: true
    max_in_flight: 10000
    basic_auth:
      enabled: true
      username: {REDACTED} 
      password: {REDACTED}
    action: "index"
    id: '${! json("ip") }'
    max_in_flight: 64
    batching:
      count: 100
      byte_size: 0
      period: ""
      check: ""

In the processor section, I added a @timestamp field. This indicates the date of last update for a given IP address. That’s quite useful, because in the daily feed of data from Spur, it’s pretty common for IP addresses to drop out of the dataset, and re-appear later.

Note also that the id property here sets the ip field in the JSONL entry as the ID for a given OpenSearch record.

I downloaded the daily feed file from Spur and extracted it to feed/feed.json.

Then I ran benthos -c ./config.yaml, and the import completed in about 50 minutes.

kharlan@ipoidopensearch:~$ curl -s "https://localhost:9200/_cat/indices?v"
health status index                        uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   ipoid                        0_N5JD_2Qmmpqa4XAb3qDg   1   1   37015900            0      6.9gb          6.9gb

It worked! From there, all I had to do for daily imports was define a cron entry to download the feed, and run the benthos command each day.

Cleaning update

As noted earlier, IPs can drop out and reappear later in the dataset. That means if you run this daily, you’ll eventually get a big pile of IPs that haven’t been seen in the Spur dataset for some period of time. Let’s clean those up with a daily purge script:

curl -s -X POST "http://localhost:9200/ipoid/_delete_by_query" \
-H 'Content-Type: application/json' -d '{
  "query": {
    "range": {
      "@timestamp": {
        "lt": "now-6w"
      }
    }
  }
}'

This query deletes all records with the @timestamp property older than 6 weeks; that’s also run in a daily cron job.

What’s possible with this setup?

The basics are easy, e.g. query by IP:

curl -s http://localhost:9200/ipoid/_search \
-H 'Content-Type: application/json' \
-d' { "query": { "term": { "ip": "some ip" } } }'

… and so is querying by IPv4 or IPv6 range:

curl -s http://localhost:9200/ipoid/_search \
-H 'Content-Type: application/json' \
-d' { "query": { "term": { "ip": "some ip/24" } } }'

Since we defined fields with the name ip as IP address data types, range support works without needing to do anything else.

As we may only care about finding IPs that show up in the latest data, we can easily filter by timestamp:

curl -s http://localhost:9200/ipoid/_search \
-H 'Content-Type: application/json' \
-d' { "query": { "bool": { 
"filter": [ { "term": { "ip": "some ip" } }, 
{ "range": { "@timestamp": { "gte": "now-24h" } } } 
] } } }'

And it’s easy and fast to query by e.g. client.proxies and other fields in the dataset. There are more fun queries to try out in the OpenSearch docs.