Ghost edits will be made, if I have something more to add.
DISCLAIMER. The following tips are geared more towards projects that deal with large-scale time series data scraping. Applying these notes to a small script might be overkill.
These are some of the things I've learned after spending the last year writing time series data scrapers at my current job. I have not countend how many sources of data we are scraping, but the loc count for the scrapers could be somewhere around 18-20k.
Python and Go code will be used for example purposes.
An open-source repo implementing some of these tips will be available in the future, which can be found here.
The biggest mistake that I've made in terms of sturcture is not following a clear seperation between the scraper implementation and the database layer. Here's an example to illustrate what I mean.
Imagine we have an external API that provides us with time series data. The API has an endpoint, and you need to parse the data and save it into the database every x minutes.
If you're tasked with this, a reasonable flow could look like this:
def create_url()
latest_start_time = get_latest_start_time_from_db()
return f"http://example.com/api/endpoint_x?from={latest_start_time}&to={start_time.add_days(5)}"
def parse_response_to_db_rows(data)
# parse the data to the database rows
return parsed_data
def scraper_endpoint_x()
# create the url with the start and end request params
url = create_url()
# request the data
data = requests.get(url)
# parse it to the database rows
db_rows = parse_response_to_db_rows(data)
# save the database rows to the database
save_db_rows(db_rows)
The problem with this approach is the lack of a clear separation between requesting, parsing, and storing the data in the database. This could lead to a lot of code duplication if we need to migrate from one database to another.
While this may not sound too bad in theory, imagine that we have written 50 scrapers without a standardized flow for requesting and parsing data and after some time there is a global decision to switch from one database (e.g., MongoDB) to another (e.g., PostgreSQL), we would need to refactor and copy-paste a lot of existing code to support this change.
Copy-pasting and editing is an option, but rewriting large chunks of valid code because of a database change is very dull work and can take a lot of time.
To keep things simple and flexible when implementing scrapers, you really just need to do a single thing. Write functions which look like this:
# request endpoint x and return the parsed data
def endpoint_x(from, to, +additional_params) -> SharedType:
url = f"http://example.com/api/endpoint_x?from={from}&to={to}"
data = requests.get(url)
parsed_data = parse_endpoint_x_to_shared_type(data)
return parsed_data
The only real dependency for the endpoint_x
function should be the returned parsed type. More on this later.
Implementing scrapers in this way has three crucial benefits:
If a function is solely responsible for fetching and converting data to a single reusable type without any database dependency, it simplifies the process of migrating from one database to another.
# call the endpoint x based on the last start time, parse the data and insert it into mongodb
def scrape_endpoint_x_for_mongodb()
latest_start_time = get_latest_start_time_from_mongodb()
end_time = latest_start_time.add_days(5)
parsed_data = endpoint_x(latest_start_time, end_time)
db_rows = convert_parsed_data_to_mongodb_documents(parsed_data)
insert_documents_into_mongodb(db_rows)
# call the endpoint x based on the last start time, parse the data and insert it into postgres
def scrape_endpoint_x_for_postgres()
# same as above. The only things that change are latest start time
# retrieval, conversion and insertion of the data
As we're dealing with time series data, it's possible for it to have gaps.
Lets assume that the endpoints we scrape provides a lot of data and we can't just rescrape everything from scratch because that would take too long to execute. One way to fix is by finding all of the possible gaps in our data from the database, and fill them with the help of the defined endpoint_x
function, which only needs the from
and to
values.
In order to make life simpler, keeping the same field name for start time and the interval of the measurement is a good idea, because the function which would be responsible for finding gaps in the selected database table would be easier to write (to be fair, this is a problem mostly for strictly typed languages).
If we have a lot of tests for various scrapers, we need to optimize their execution speed. One way to do this is by using short request periods when running tests, reducing the amount of data we handle. This is easily achievable when you can define the requestable period in the endpoint_x
function.
Let's assume that we have an api endpoint, which allows us to request data only based on a single date param, like api.com/endpoint_x?date=2024-06-29
. It is still better to implement the endpoint_x
function in a way that it accepts from
and to
params, due to the fact that this logic can be internalized into the function.
Assuming we scrape multiple places with the same type stored in the database, it's better to create a package that stores all these types, which can be shared by the scraper implementation layer.
This is where mongodb can be a great initial database, because the defined types can become the document schema that is stored in the database + a parsed type definition.
To standartize the type system, it's a good idea to create a type / function that always gets called when creating new data. Here's an example:
// Base type for time series data
type TimeseriesFields struct {
// StartTime is the start of the measurement
StartTime time.Time `json:"start_time" bson:"start_time"`
// Interval is the frequency of the measurement in milliseconds (could also be called Resolution)
Interval int64 `json:"interval" bson:"interval"`
}
// NewFields creates a new TimeseriesFields struct with verbose error handling
func NewFields(startTime, endTime time.Time) (TimeseriesFields, error) {
if startTime.IsZero() {
return TimeseriesFields{}, errors.New("start time is zero")
}
if endTime.IsZero() {
return TimeseriesFields{}, errors.New("end time is zero")
}
if startTime.After(endTime) {
return TimeseriesFields{}, errors.New("start time is after end time")
}
if startTime.Equal(endTime) {
return TimeseriesFields{}, errors.New("start time is equal to end time")
}
return TimeseriesFields{
StartTime: startTime.UTC(),
Interval: timeset.DiffInMilliseconds(startTime, endTime),
}
}
// Type which holds the parsed data
type ParsedTimeseriesData struct {
TimeseriesFields `json:",inline" bson:",inline"`
ParsedValue1 float64 `json:"parsed_value_1" bson:"parsed_value_1"`
ParsedValue2 float64 `json:"parsed_value_2" bson:"parsed_value_2"`
}
Defining these primitives fixes multiple problems:
Upserts mean
insert the data if it does not exist, update it if it does.
When dealing with time series data, upserts are essential to avoid duplicate rows.
MongoDB has native support for upserts. For SQL databases, you can use the ON CONFLICT
clause.
It's good practice to create an overlay window which gets refetched every time you scrape data. This is because:
The meaning of the field which holds the time can vary from source to source. The base assumption that the time field is equal to the start of the measurement is not always true. Some places use the end time of the measurement as the time field.
The end goal is to write code that remains untouched after it's written. One way to increase the possibility of this, is by not using hardcoded default start times from which the scrapers should make requests from, due to the fact that the data source may delete the data from the past, which would make the hardcoded start time invalid after some time.
Before implementing scraping logic, check:
transform.tools is a great resource for quickly converting api responses to the type definition in your language of choice.
However, there's an edge case to be aware of. When fetching data from endpoint x
for period t1
-> t2
, the response type might change for different periods.
// Parsed response when requesting from t1 to t2
type EndpointXResponse1 struct {
Data []struct {
Time string `json:"time"`
Value float64 `json:"value"`
} `json:"data"`
}
// Parsed response when requesting from t3 to t4
type EndpointXResponse2 struct {
Data []struct {
Time any `json:"time"`
Value float64 `json:"value"`
} `json:"data"`
}
Assuming the returned type remains consistent for all periods can be risky. One way to avoid this is by setting types to any and using reflection to check if the type is correct.