Home
Writing
3D Stuff
/

A guide for writing large time series data scraper projects

2024-06-29

Ghost edits will be made, if I have something more to add.

DISCLAIMER. The following tips are geared more towards projects that deal with large-scale time series data scraping. Applying these notes to a small script might be overkill.

These are some of the things I've learned after spending the last year writing time series data scrapers at my current job. I have not countend how many sources of data we are scraping, but the loc count for the scrapers could be somewhere around 18-20k.


Python and Go code will be used for example purposes.


An open-source repo implementing some of these tips will be available in the future, which can be found here.

Base structure

The biggest mistake that I've made in terms of sturcture is not following a clear seperation between the scraper implementation and the database layer. Here's an example to illustrate what I mean.


Imagine we have an external API that provides us with time series data. The API has an endpoint, and you need to parse the data and save it into the database every x minutes.


If you're tasked with this, a reasonable flow could look like this:

def create_url()
  latest_start_time = get_latest_start_time_from_db()
  return f"http://example.com/api/endpoint_x?from={latest_start_time}&to={start_time.add_days(5)}"

def parse_response_to_db_rows(data)
    # parse the data to the database rows
    return parsed_data

def scraper_endpoint_x()
  # create the url with the start and end request params
  url = create_url()
  # request the data
  data = requests.get(url)
  # parse it to the database rows
  db_rows = parse_response_to_db_rows(data)
  # save the database rows to the database
  save_db_rows(db_rows)

The problem with this approach is the lack of a clear separation between requesting, parsing, and storing the data in the database. This could lead to a lot of code duplication if we need to migrate from one database to another.


While this may not sound too bad in theory, imagine that we have written 50 scrapers without a standardized flow for requesting and parsing data and after some time there is a global decision to switch from one database (e.g., MongoDB) to another (e.g., PostgreSQL), we would need to refactor and copy-paste a lot of existing code to support this change.


Copy-pasting and editing is an option, but rewriting large chunks of valid code because of a database change is very dull work and can take a lot of time.

Example of a better structure

To keep things simple and flexible when implementing scrapers, you really just need to do a single thing. Write functions which look like this:

# request endpoint x and return the parsed data
def endpoint_x(from, to, +additional_params) -> SharedType:
  url = f"http://example.com/api/endpoint_x?from={from}&to={to}"
  data = requests.get(url)
  parsed_data = parse_endpoint_x_to_shared_type(data)
  return parsed_data

The only real dependency for the endpoint_x function should be the returned parsed type. More on this later.


Implementing scrapers in this way has three crucial benefits:


Implementing the logic based on databases

If a function is solely responsible for fetching and converting data to a single reusable type without any database dependency, it simplifies the process of migrating from one database to another.

# call the endpoint x based on the last start time, parse the data and insert it into mongodb
def scrape_endpoint_x_for_mongodb()
  latest_start_time = get_latest_start_time_from_mongodb()
  end_time = latest_start_time.add_days(5)

  parsed_data = endpoint_x(latest_start_time, end_time)
  db_rows = convert_parsed_data_to_mongodb_documents(parsed_data)
  insert_documents_into_mongodb(db_rows)

# call the endpoint x based on the last start time, parse the data and insert it into postgres
def scrape_endpoint_x_for_postgres()
  # same as above. The only things that change are latest start time
  # retrieval, conversion and insertion of the data

Gap filling

As we're dealing with time series data, it's possible for it to have gaps.


Lets assume that the endpoints we scrape provides a lot of data and we can't just rescrape everything from scratch because that would take too long to execute. One way to fix is by finding all of the possible gaps in our data from the database, and fill them with the help of the defined endpoint_x function, which only needs the from and to values.


In order to make life simpler, keeping the same field name for start time and the interval of the measurement is a good idea, because the function which would be responsible for finding gaps in the selected database table would be easier to write (to be fair, this is a problem mostly for strictly typed languages).


Test optimization

If we have a lot of tests for various scrapers, we need to optimize their execution speed. One way to do this is by using short request periods when running tests, reducing the amount of data we handle. This is easily achievable when you can define the requestable period in the endpoint_x function.


Additional notes

Let's assume that we have an api endpoint, which allows us to request data only based on a single date param, like api.com/endpoint_x?date=2024-06-29. It is still better to implement the endpoint_x function in a way that it accepts from and to params, due to the fact that this logic can be internalized into the function.

Type system

Assuming we scrape multiple places with the same type stored in the database, it's better to create a package that stores all these types, which can be shared by the scraper implementation layer.


This is where mongodb can be a great initial database, because the defined types can become the document schema that is stored in the database + a parsed type definition.

Base type

To standartize the type system, it's a good idea to create a type / function that always gets called when creating new data. Here's an example:

// Base type for time series data
type TimeseriesFields struct {
  // StartTime is the start of the measurement
    StartTime time.Time `json:"start_time" bson:"start_time"`
   // Interval is the frequency of the measurement in milliseconds (could also be called Resolution)
    Interval  int64     `json:"interval" bson:"interval"`
}

// NewFields creates a new TimeseriesFields struct with verbose error handling
func NewFields(startTime, endTime time.Time) (TimeseriesFields, error) {
  if startTime.IsZero() {
    return TimeseriesFields{}, errors.New("start time is zero")
  }

  if endTime.IsZero() {
    return TimeseriesFields{}, errors.New("end time is zero")
  }

  if startTime.After(endTime) {
    return TimeseriesFields{}, errors.New("start time is after end time")
  }

  if startTime.Equal(endTime) {
    return TimeseriesFields{}, errors.New("start time is equal to end time")
  }

    return TimeseriesFields{
        StartTime: startTime.UTC(),
        Interval:  timeset.DiffInMilliseconds(startTime, endTime),
    }
}

// Type which holds the parsed data
type ParsedTimeseriesData struct {
    TimeseriesFields `json:",inline" bson:",inline"`
    ParsedValue1    float64 `json:"parsed_value_1" bson:"parsed_value_1"`
    ParsedValue2    float64 `json:"parsed_value_2" bson:"parsed_value_2"`
}

Defining these primitives fixes multiple problems:

  • Timezone standardization (always UTC, avoids DST problems)
  • Field name standardization (always the same field name for the same type of data)
  • Reusability / DRY principle

Upserts are your friend

Upserts mean

insert the data if it does not exist, update it if it does.

When dealing with time series data, upserts are essential to avoid duplicate rows.


MongoDB has native support for upserts. For SQL databases, you can use the ON CONFLICT clause.

Be liberal in the period you rescrape

It's good practice to create an overlay window which gets refetched every time you scrape data. This is because:

  • The data source may update the previous data.
  • Querying the data based on url params can be unstable, as it's not guaranteed what timezone will be applied to them.

Gotchas


Meaning of time fields in the data sources

The meaning of the field which holds the time can vary from source to source. The base assumption that the time field is equal to the start of the measurement is not always true. Some places use the end time of the measurement as the time field.


Don't hardcode the start time fields

The end goal is to write code that remains untouched after it's written. One way to increase the possibility of this, is by not using hardcoded default start times from which the scrapers should make requests from, due to the fact that the data source may delete the data from the past, which would make the hardcoded start time invalid after some time.


Timezones and DST changes

Before implementing scraping logic, check:

  • The timezone of the data source (if not indicated in the field)
  • How does the data source handle DST changes when the timezone is not indicated in the field. If they use a custom way of indicating the change, then parsing the time values can get a lot more complicated.
Defining types for the api responses

transform.tools is a great resource for quickly converting api responses to the type definition in your language of choice.


However, there's an edge case to be aware of. When fetching data from endpoint x for period t1 -> t2, the response type might change for different periods.

// Parsed response when requesting from t1 to t2
type EndpointXResponse1 struct {
  Data []struct {
    Time string `json:"time"`
    Value float64 `json:"value"`
  } `json:"data"`
}

// Parsed response when requesting from t3 to t4
type EndpointXResponse2 struct {
  Data []struct {
    Time any `json:"time"`
    Value float64 `json:"value"`
  } `json:"data"`
}

Assuming the returned type remains consistent for all periods can be risky. One way to avoid this is by setting types to any and using reflection to check if the type is correct.

Conclusion

  • Separate the api implementation and scraper implementation.
  • Share placeholder types between API implementations.
  • Use upserts.
  • Be paranoid.