today i go over a MVP for a scalable event scanner for ethereum, through indexing and parsing block events.
this is step 0 for building distributed systems that allow training machine learning models on the chains (e.g., high-frequency trading with deep learning) and other shenanigans.
this is one data engineering approach: when you consciously decide to build a data lake and want to learn more about how blockchain emit logs. this project can be built in one day. for a rust agent that performs (websockets) monitoring on cexes and other approaches, check go-outside-labs’ kiddor-searcher-bot.
leverage python to build a cli tool that indexes transfer events for a particular token through eth_getLogs
,
prepare the data to be ingested and loaded into a simple nosql database (mongodb),
build and deploy a restful api for fast balance and ownership statistics retrieval with python’s fastapi (while looking at asynchronous performance).
how to index blockchain(s) data into a graph database, such as neoj4, or high performance nosql databases, such as apache cassandra, or apache arrow data structures
an in-depth exploration of google’s (nosql) bigtable projects (e.g., by the blockchain-etl group), and apache airflow dags for exporting, loading, and parsing data
an in-depth look at state-of-the-art distributed sql query engines, such as trino
how to implement distributed event streaming (e.g., with apache kafka, amazon sqs and sns, or rabbitmq)
how to index local nodes (e.g., with a local running execution client, as opposed to using external rpc urls)
how to simultaneously index other tokens and alt chains
how to deploy this pipeline through kubernetes and terraform, as it scales for over 100mil+ entries
in this post, we are building a data engineering tool to index and process block data on ethereum. by the end, you should have a full api deployed in the cloud plus a local cli tool as well:
to get started, either clone my code or write your own files as we review this post.
create a venv
, either using virtualenv, pipenv, or poetry.
because of some of the dependencies in this code, we will be developing on a python3.9 environment (install here if you don’t have that version on disk):
> virtualenv -p /usr/local/bin/python3.9 venv
> source venv/bin/activate
> make install_dep
now, create a .env
file and add an RPC_PROVIDER_URL
to connect to ethereum mainnet nodes (for example, from this list):
> cp .env.example .env
> vim .env
this is what this final project looks like (you get this by running tree
):
> make install
> indexer -h
🪙 Token indexer and API.
(...)
the entry point of this project is through src/main.py
:
before we review the first class
in this project, TokenIndexer
(which retrieves the token’s historical transfer event data from the ethereum mainnet), we need to decide which token we would like to index.
in this example, i am looking at an erc-721 token, the doge nft (dog) token. feel free to add your favorite token’s contract in the .env
file:
TOKEN_CONTRACT = 0xBAac2B4491727D78D2b78815144570b9f2Fe8899
by the way, all environment variables are retrieved by a function defined inside src/utils/os_utils.py
, shown below. note that the last method in the file, send_rpc_request()
, issues the post
requests to ethereum’s json-rpc
api:
we also need the abi of the contract.
since the doge nft token contract is behind a proxy, to be able to index the transactions (e.g., call Transfer()
), we need to retrieve the abi of the proxy contract (while still using the original contract’s address).
how do you extract the abi from a contract?
(remember that the abi, the contract’s application binary interface, is the interface that specifies how to interact with this specific contract, including method names, parameters, constants, data structures, event types, etc.)
in this case, since the contract is verified on etherscan, we can call the endpoint below (which needs an api key that can be created here):
https://api.etherscan.io/api?
module=contract&
action=getabi&
address=0x7b0fce54574d9746414d11367f54c9ab94e53dca&
apikey=<etherscan api>
for non-verified contracts, you can try tools like this one.
paste the abi into a json
file inside ./abi/
.
finally, add the path for this file in the .env
file:
TOKEN_CONTRACT_ABI = ./abi/<abi file.json>
to allow users to access transactions event data, the ethereum evm keeps an event log of every block’s transactions, which can be retrieved with the eth_getLogs
json-rpc
method. in other words, any time a transaction is minted, event logs are fired.
we are interested in Transfer()
events, which represent functions that can transfer some assets between two addresses. an event signature is used to identify this specific event log, which is the keccak-256 hash of Transfer(from, to, value)
:
keccak256(Transfer(address,address,uint256) = ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef
this is an example of the data passed when calling eth_getLogs
:
{
'jsonrpc': '2.0',
'method': 'eth_getLogs',
'params':
[
{
'address':'0xBAac2B4491727D78D2b78815144570b9f2Fe8899',
'fromBlock': '0x1',
'toBlock': '0x1389',
'topics': [
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
]
}
],
'id': 1
}
and this is an example of response:
{
'address': '0xbaac2b4491727d78d2b78815144570b9f2fe8899',
'blockHash':'0x543c387ba2d9b8173cba357d51f2f7329fcd02d475d9e116d749f8cdc203d763',
'blockNumber': '0xc85a3e',
'data':'0x000000000000000000000000000000000000000036d5011c02b1b33ec5840000',
'logIndex': '0x25f',
'removed': False,
'topics': ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef','0x0000000000000000000000000000000000000000000000000000000000000000','0x000000000000000000000000f5c27c6fe782cbb5c85989ea3e75754748153459'],
'transactionHash':'0x5e5d83fd2d43b3f7aab6f2b4c21a69dd6d804f59711dec13198d4792a4e5e158',
'transactionIndex': '0x199'
}
back to ourTokenIndexer class
, we can see how eth_getLogs
is being leveraged to retrieve the target transfer events:
note that this class uses math methods that are defined in the file below:
we can now fire the indexer and start retrieving the historical transfer events data on ethereum:
> indexer -e
(...)
ℹ️ Indexing transfer events between blocks 16780001 and 16809808...
loading blocks 16780001 to 16785001
ℹ️ Found 30 transfer events between blocks 16780001 and 16785001.
loading blocks 16785001 to 16790001
ℹ️ Found 26 transfer events between blocks 16785001 and 16790001.
loading blocks 16790001 to 16795001
(...)
ℹ️ Results were saved at ./output/raw_data_2023-03-11_21-15-52.json.
once all the blocks are indexed (it might take some time, depending on the token), we should run a sanity check by checking the data against etherscan (a great tool for JSON
exploitation is jq):
> cat <result json file> | jq . | tail
nice, the last entry of that particular result file does match with etherscan.
this is the bulk of the data to be ingested into our database (i.e., cached). from here, every new block info can be added on top of this historical data whenever a query is being run.
to prepare the data, let’s write a script that processes the transfer events into balances by wallet:
running it:
> indexer -p <json result file>
ℹ️ Writing balances to ./output/balances_*.json
> cat ./output/balances_*.json
(...)
"0xd2b91e16b8bed2b643e600cbeb88f4ebb1ca1727": 10250.336445767043,
"0x716a42b8b7a89ccfedb90d42036166846c38ac75": 10251.214376623593,
"0xa957498ad9744f5c827056988b2c948c09a8d722": 10260.791115637423,
"0x5616721ca2a299f36e9ba02bf2770961c3899c43": 10261.38906964618,
"0x040baa573b9dab3143425d7e0d11916961d385bf": 10269.571681498099,
(...)
sweet, it works: we have the historical balance data by wallet.
the next step is to feed the balance data into the database so that it can be accessed by the api we are building.
we will be using a document database, mongodb, in which a record is a document composed of field and value pairs (similar to a dict
or a json
). mongodb stores data records as bson
, a binary representation of json
.
mongodb stores documents inside collections (something like tables in sql/relational databases), and each collection is assigned to an immutable uuid (universally unique identifier).
when developing software, a local setup is necessary prior to the production (cloud) deployment.
start by installing mongodb following these instructions. to install a local gui, download mongodb compass.
to start the service, run:
brew services start mongodb-<version>
the default connection is set at http://localhost:27017, and you can check whether the process is running with:
ps aux | grep -v grep | grep mongod
a mongodb shell written in javascript can be launched with:
mongosh
let’s run a python script that loads the balance data into the local database (note that this only should be run once, and all the later db handling is done by methods from the server/
module):
running indexer -d <balances file.json>
populates the balances
collection:
fastapi is a high-performance framework built on top of asyncio, a python library that implements concurrent code using the async/await syntax.
💡 a function with async is a coroutine. it can be paused internally, allowing the program to execute in increments (and suspending or resuming execution).
the first step to deploy our api is to create an app
and call it with uvicorn (an asynchronous server gateway interface web server implementation for python):
uvicorn.run("src.server.api:app", host=HOST, port=PORT, reload=True)
:
then, we define the api routes:
💡 futures represent the result of a task that may or may not have been executed.
and, finally, the database model methods:
to spin up the api locally, run indexer -a
and open http://0. 0.0.0:80
in your browser. you should see this:
if you open http://0. 0.0.0:80/docs
, you should see this:
you can test this api through the browser, using curl
, or with the indexer cli.
for instance, fetching a wallet’s balance:
> indexer -b 0xe9f3bcdfa00f040bb5436601a476f780bb5af16a
ℹ️ {'result': [{'wallet': '0xe9f3bcdfa00f040bb5436601a476f780bb5af16a', 'balance': 10227.977269319452}]}
or fetching top 100 token holders:
> indexer -t
ℹ️ {"result":[{"wallet":"0xf894fea045eccb2927e2e0cb15c12debee9f2be8","balance":8304434869.4720545},{"wallet":"0xc96f20099d96b37d7ede66ff9e4de59b9b1065b1","balance":6250026548.525733},{"wallet":"0x563b1ae9717e9133b0c70d073c931368e1bd86e5","balance":3631605454.7259746},
(...)
we can now deploy the api in the cloud using vercel , while utilizing mongodb’s atlas for mongo.
create an account and a database, then upload the data:
next add the MONGODB_URL
to .env
:
MONGDB_URI="mongodb+srv://<username>:<password>@<url>/<db>?retryWrites=true&w=majority"
since the instance will be short-lived (only to illustrate this project), we won’t bother with authentication. however, if you are adapting this project for a production api, you want to look at adding jwt authentication into your fastapi (or some alternative auth method).
another detail is atlas’ ip addresses access list, which for this project, we will allow access to 0.0.0.0/0 (so vercel can access).
to deploy to vercel, add a .vercel
config file to ./
:
{
"builds": [
{"src": "/src/api.py", "use": "@vercel/python"}
],
"routes": [
{"src": "/(.*)", "dest": "src/api.py"}
]
}
then run:
> vercel login
> vercel .
inside the vercel project’s setting, upload all env variables:
and voilà: