on building a scalable ethereum event scanner

tl; dr

today i go over a MVP for a scalable event scanner for ethereum, through indexing and parsing block events.

this is step 0 for building distributed systems that allow training machine learning models on the chains (e.g., high-frequency trading with deep learning) and other shenanigans.

this is one data engineering approach: when you consciously decide to build a data lake and want to learn more about how blockchain emit logs. this project can be built in one day. for a rust agent that performs (websockets) monitoring on cexes and other approaches, check go-outside-labs’ kiddor-searcher-bot.


today you will learn how to:

  1. leverage python to build a cli tool that indexes transfer events for a particular token through eth_getLogs,

  2. prepare the data to be ingested and loaded into a simple nosql database (mongodb),

  3. build and deploy a restful api for fast balance and ownership statistics retrieval with python’s fastapi (while looking at asynchronous performance).

mvp for a token transfer event scanner.
mvp for a token transfer event scanner.

in future posts, i shall review the next steps of a production-grade scanner, for instance:

  • how to create graphql structures with the graph

  • how to index blockchain(s) data into a graph database, such as neoj4, or high performance nosql databases, such as apache cassandra, or apache arrow data structures

  • an in-depth exploration of google’s (nosql) bigtable projects (e.g., by the blockchain-etl group), and apache airflow dags for exporting, loading, and parsing data

  • an in-depth look at state-of-the-art distributed sql query engines, such as trino

  • how to implement distributed event streaming (e.g., with apache kafka, amazon sqs and sns, or rabbitmq)

  • how to index local nodes (e.g., with a local running execution client, as opposed to using external rpc urls)

  • how to simultaneously index other tokens and alt chains

  • how to deploy this pipeline through kubernetes and terraform, as it scales for over 100mil+ entries

system design for a blockchain intelligence data platform (all deployed on kubernetes).
system design for a blockchain intelligence data platform (all deployed on kubernetes).


🎶 today’s mood


🦅 a cli tool to index token’s events

in this post, we are building a data engineering tool to index and process block data on ethereum. by the end, you should have a full api deployed in the cloud plus a local cli tool as well:

the scanner cli, which is installed under the package name "indexer".
the scanner cli, which is installed under the package name "indexer".

to get started, either clone my code or write your own files as we review this post.

installing dependencies

create a venv, either using virtualenv, pipenv, or poetry.

because of some of the dependencies in this code, we will be developing on a python3.9 environment (install here if you don’t have that version on disk):

> virtualenv -p /usr/local/bin/python3.9 venv
> source venv/bin/activate
> make install_dep

add environment variables

now, create a .env file and add an RPC_PROVIDER_URL to connect to ethereum mainnet nodes (for example, from this list):

> cp .env.example .env
> vim .env

project structure

this is what this final project looks like (you get this by running tree):

> tree .
> tree .

installing the package

> make install
> indexer -h 

🪙 Token indexer and API.
(...)

the main class

the entry point of this project is through src/main.py:

src/main.py:
src/main.py:

🦅 indexing historical transfer events

before we review the first class in this project, TokenIndexer (which retrieves the token’s historical transfer event data from the ethereum mainnet), we need to decide which token we would like to index.

in this example, i am looking at an erc-721 token, the doge nft (dog) token. feel free to add your favorite token’s contract in the .env file:

TOKEN_CONTRACT = 0xBAac2B4491727D78D2b78815144570b9f2Fe8899

by the way, all environment variables are retrieved by a function defined inside src/utils/os_utils.py, shown below. note that the last method in the file, send_rpc_request(), issues the post requests to ethereum’s json-rpc api:

src/utils/os_utils.py
src/utils/os_utils.py

finding out the contract’s abi (and dealing with proxies)

we also need the abi of the contract.

since the doge nft token contract is behind a proxy, to be able to index the transactions (e.g., call Transfer()), we need to retrieve the abi of the proxy contract (while still using the original contract’s address).

we use the token's contract, with the abi of the proxy contract.
we use the token's contract, with the abi of the proxy contract.

how do you extract the abi from a contract?

(remember that the abi, the contract’s application binary interface, is the interface that specifies how to interact with this specific contract, including method names, parameters, constants, data structures, event types, etc.)

in this case, since the contract is verified on etherscan, we can call the endpoint below (which needs an api key that can be created here):

https://api.etherscan.io/api?
module=contract&
action=getabi&
address=0x7b0fce54574d9746414d11367f54c9ab94e53dca&
apikey=<etherscan api>

for non-verified contracts, you can try tools like this one.

paste the abi into a json file inside ./abi/.

finally, add the path for this file in the .env file:

TOKEN_CONTRACT_ABI = ./abi/<abi file.json>

fetching the data

to allow users to access transactions event data, the ethereum evm keeps an event log of every block’s transactions, which can be retrieved with the eth_getLogs json-rpc method. in other words, any time a transaction is minted, event logs are fired.

we are interested in Transfer() events, which represent functions that can transfer some assets between two addresses. an event signature is used to identify this specific event log, which is the keccak-256 hash of Transfer(from, to, value):

keccak256(Transfer(address,address,uint256) = ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef

this is an example of the data passed when calling eth_getLogs:

{
  'jsonrpc': '2.0', 
  'method': 'eth_getLogs', 
  'params': 
      [
        {
          'address':'0xBAac2B4491727D78D2b78815144570b9f2Fe8899', 
          'fromBlock': '0x1', 
          'toBlock': '0x1389', 
          'topics':  [
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'  
                    ]
       }
      ], 
 'id': 1
}

and this is an example of response:

{
  'address': '0xbaac2b4491727d78d2b78815144570b9f2fe8899',  
  'blockHash':'0x543c387ba2d9b8173cba357d51f2f7329fcd02d475d9e116d749f8cdc203d763', 
  'blockNumber': '0xc85a3e', 
  'data':'0x000000000000000000000000000000000000000036d5011c02b1b33ec5840000', 
  'logIndex': '0x25f', 
  'removed': False, 
  'topics': ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef','0x0000000000000000000000000000000000000000000000000000000000000000','0x000000000000000000000000f5c27c6fe782cbb5c85989ea3e75754748153459'], 
  'transactionHash':'0x5e5d83fd2d43b3f7aab6f2b4c21a69dd6d804f59711dec13198d4792a4e5e158', 
  'transactionIndex': '0x199'
}

back to ourTokenIndexer class, we can see how eth_getLogs is being leveraged to retrieve the target transfer events:

src/blockchains/ethereum.py
src/blockchains/ethereum.py

note that this class uses math methods that are defined in the file below:

src/utils/arithmetics.py
src/utils/arithmetics.py

we can now fire the indexer and start retrieving the historical transfer events data on ethereum:

> indexer -e
(...)
ℹ️ Indexing transfer events between blocks 16780001 and 16809808...
loading blocks 16780001 to 16785001
ℹ️ Found 30 transfer events between blocks 16780001 and 16785001.
loading blocks 16785001 to 16790001
ℹ️ Found 26 transfer events between blocks 16785001 and 16790001.
loading blocks 16790001 to 16795001
(...)
ℹ️ Results were saved at ./output/raw_data_2023-03-11_21-15-52.json.

verifying the data

once all the blocks are indexed (it might take some time, depending on the token), we should run a sanity check by checking the data against etherscan (a great tool for JSON exploitation is jq):

> cat <result json file> | jq . | tail  
a transfer event entry.
a transfer event entry.

nice, the last entry of that particular result file does match with etherscan.

this is the bulk of the data to be ingested into our database (i.e., cached). from here, every new block info can be added on top of this historical data whenever a query is being run.


🦅 preparing the data prior db ingestion

to prepare the data, let’s write a script that processes the transfer events into balances by wallet:

src/utils/data_processing.py
src/utils/data_processing.py

running it:

> indexer -p <json result file>
ℹ️ Writing balances to ./output/balances_*.json

> cat ./output/balances_*.json
(...)
"0xd2b91e16b8bed2b643e600cbeb88f4ebb1ca1727": 10250.336445767043,
"0x716a42b8b7a89ccfedb90d42036166846c38ac75": 10251.214376623593,
"0xa957498ad9744f5c827056988b2c948c09a8d722": 10260.791115637423,
"0x5616721ca2a299f36e9ba02bf2770961c3899c43": 10261.38906964618,
"0x040baa573b9dab3143425d7e0d11916961d385bf": 10269.571681498099,
(...)

sweet, it works: we have the historical balance data by wallet.


🦅 setting up the mongodb database

the next step is to feed the balance data into the database so that it can be accessed by the api we are building.

we will be using a document database, mongodb, in which a record is a document composed of field and value pairs (similar to a dict or a json). mongodb stores data records as bson, a binary representation of json.

mongodb stores documents inside collections (something like tables in sql/relational databases), and each collection is assigned to an immutable uuid (universally unique identifier).

setting up a local mongodb

when developing software, a local setup is necessary prior to the production (cloud) deployment.

start by installing mongodb following these instructions. to install a local gui, download mongodb compass.

to start the service, run:

brew services start mongodb-<version>

the default connection is set at http://localhost:27017, and you can check whether the process is running with:

ps aux | grep -v grep | grep mongod

a mongodb shell written in javascript can be launched with:

mongosh

loading the balance data into the local db instance

let’s run a python script that loads the balance data into the local database (note that this only should be run once, and all the later db handling is done by methods from the server/ module):

src/utils/db_processing.py
src/utils/db_processing.py

running indexer -d <balances file.json> populates the balances collection:

database view from the mongodb compass gui.
database view from the mongodb compass gui.

🦅 creating and deploying an api service

fastapi is a high-performance framework built on top of asyncio, a python library that implements concurrent code using the async/await syntax.

💡 a function with async is a coroutine. it can be paused internally, allowing the program to execute in increments (and suspending or resuming execution).

the first step to deploy our api is to create an app and call it with uvicorn (an asynchronous server gateway interface web server implementation for python):

uvicorn.run("src.server.api:app", host=HOST, port=PORT, reload=True):

src/server/api.py
src/server/api.py

then, we define the api routes:

src/server/routes.py
src/server/routes.py

💡 futures represent the result of a task that may or may not have been executed.

and, finally, the database model methods:

server/database.py
server/database.py

🦅 testing the api

to spin up the api locally, run indexer -a and open http://0. 0.0.0:80 in your browser. you should see this:

if you open http://0. 0.0.0:80/docs, you should see this:

our api's documentation.
our api's documentation.

you can test this api through the browser, using curl, or with the indexer cli.

for instance, fetching a wallet’s balance:

> indexer -b 0xe9f3bcdfa00f040bb5436601a476f780bb5af16a

ℹ️ {'result': [{'wallet': '0xe9f3bcdfa00f040bb5436601a476f780bb5af16a', 'balance': 10227.977269319452}]}

or fetching top 100 token holders:

> indexer -t

ℹ️ {"result":[{"wallet":"0xf894fea045eccb2927e2e0cb15c12debee9f2be8","balance":8304434869.4720545},{"wallet":"0xc96f20099d96b37d7ede66ff9e4de59b9b1065b1","balance":6250026548.525733},{"wallet":"0x563b1ae9717e9133b0c70d073c931368e1bd86e5","balance":3631605454.7259746},
(...)

🦅 deployment to production

we can now deploy the api in the cloud using vercel , while utilizing mongodb’s atlas for mongo.

mongodb atlas

create an account and a database, then upload the data:

balances.balances at mongodb atlas.
balances.balances at mongodb atlas.

next add the MONGODB_URL to .env:

MONGDB_URI="mongodb+srv://<username>:<password>@<url>/<db>?retryWrites=true&w=majority"

since the instance will be short-lived (only to illustrate this project), we won’t bother with authentication. however, if you are adapting this project for a production api, you want to look at adding jwt authentication into your fastapi (or some alternative auth method).

another detail is atlas’ ip addresses access list, which for this project, we will allow access to 0.0.0.0/0 (so vercel can access).

vercel

to deploy to vercel, add a .vercel config file to ./:

{
  "builds": [
    {"src": "/src/api.py", "use": "@vercel/python"}
  ],
  "routes": [
    {"src": "/(.*)", "dest": "src/api.py"}
  ]
}

then run:

> vercel login
> vercel .

inside the vercel project’s setting, upload all env variables:

and voilà:

our api in the cloud.
our api in the cloud.


◻️ motherofbots.eth

Subscribe to bt3gl's autistic symposium
Receive the latest updates directly to your inbox.
Mint this entry as an NFT to add it to your collection.
Verification
This entry has been permanently stored onchain and signed by its creator.