Loading Spur JSONL data into OpenSearch
2024-06-30 22:54:13 +0200 +0200
I recently had the experience of working out how to import a daily feed of tens of millions lines of JSONL data into OpenSearch.
Background
Spur.us offers a data feed of IP address intelligence data. Each day, you can download a file with about ~30 million lines of data in JSONL format.
I wanted to be able to load this data such that:
- it’s easy to query for the latest data
- it’s possible to query across dimensions in the data, e.g. “Show me all IPs with proxy label
LUMINATI_PROXY
”
On their site, Spur says:
Our feeds are designed to be easily ingested into most data-lake or cloud-native database solutions.
Turns out, they are right, it is very easily done.
Setup
I set up a new server and installed OpenSearch using the Debian install guide.
Then I set up these field mappings on the index:
{
"mappings" : {
"properties" : {
"ip" : {
"type" : "ip"
},
"tunnels.exits" : {
"type" : "ip"
},
"tunnels.entries" : {
"type" : "ip"
},
"client.concentration.geohash": {
"type": "geo_point"
}
}
}
}
We’re not making use of the geohash
field, but the other ones are important: they tell OpenSearch to treat the fields with the name ip
as an IP address field type.
The import
I installed Benthos and created a config file at $HOME/config.yaml
. The relevant parts are:
input:
label: ""
read_until:
idle_timeout: 600s
input:
file:
paths: [ ./feed/feed.json ]
scanner:
lines: {}
pipeline:
threads: -1
processors:
- awk:
codec: none
program: |
json_set("@timestamp", timestamp_format(timestamp_unix()));
output:
opensearch:
urls: [ https://localhost:9200 ]
index: "ipoid"
tls:
enabled: true
skip_cert_verify: true
max_in_flight: 10000
basic_auth:
enabled: true
username: {REDACTED}
password: {REDACTED}
action: "index"
id: '${! json("ip") }'
max_in_flight: 64
batching:
count: 100
byte_size: 0
period: ""
check: ""
In the processor
section, I added a @timestamp
field. This indicates the date of last update for a given IP address. That’s quite useful, because in the daily feed of data from Spur, it’s pretty common for IP addresses to drop out of the dataset, and re-appear later.
Note also that the id
property here sets the ip
field in the JSONL entry as the ID for a given OpenSearch record.
I downloaded the daily feed file from Spur and extracted it to feed/feed.json
.
Then I ran benthos -c ./config.yaml
, and the import completed in about 50 minutes.
kharlan@ipoidopensearch:~$ curl -s "https://localhost:9200/_cat/indices?v"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open ipoid 0_N5JD_2Qmmpqa4XAb3qDg 1 1 37015900 0 6.9gb 6.9gb
It worked! From there, all I had to do for daily imports was define a cron entry to download the feed, and run the benthos
command each day.
Cleaning update
As noted earlier, IPs can drop out and reappear later in the dataset. That means if you run this daily, you’ll eventually get a big pile of IPs that haven’t been seen in the Spur dataset for some period of time. Let’s clean those up with a daily purge script:
curl -s -X POST "http://localhost:9200/ipoid/_delete_by_query" \
-H 'Content-Type: application/json' -d '{
"query": {
"range": {
"@timestamp": {
"lt": "now-6w"
}
}
}
}'
This query deletes all records with the @timestamp
property older than 6 weeks; that’s also run in a daily cron job.
What’s possible with this setup?
The basics are easy, e.g. query by IP:
curl -s http://localhost:9200/ipoid/_search \
-H 'Content-Type: application/json' \
-d' { "query": { "term": { "ip": "some ip" } } }'
… and so is querying by IPv4 or IPv6 range:
curl -s http://localhost:9200/ipoid/_search \
-H 'Content-Type: application/json' \
-d' { "query": { "term": { "ip": "some ip/24" } } }'
Since we defined fields with the name ip
as IP address data types, range support works without needing to do anything else.
As we may only care about finding IPs that show up in the latest data, we can easily filter by timestamp:
curl -s http://localhost:9200/ipoid/_search \
-H 'Content-Type: application/json' \
-d' { "query": { "bool": {
"filter": [ { "term": { "ip": "some ip" } },
{ "range": { "@timestamp": { "gte": "now-24h" } } }
] } } }'
And it’s easy and fast to query by e.g. client.proxies
and other fields in the dataset. There are more fun queries to try out in the OpenSearch docs.