Building an arbitrage bot: プール価格の効率的な読み取り (記事 2/n)

引用元:

この記事は、アービトラージbotを構築するシリーズの一部です。この記事では、Uniswap V2やSushiSwapなどのV2互換DEXのプールの価格を読み取る実用的で現実的な方法を紹介します。

ファクトリペアの読み取り

前回の記事では、単一のUniswap V2ペアのリザーブを読み取る方法を見ました。しかし、Uniswapには10万個以上のペアがあり、他のV2互換DEXにはさらに多くのペアがあります。1つずつ価格を読み取ることは不可能であり、すべてを一度に読み取る方法が必要です。

ペアのリザーブを読み取る前に、ペアコントラクトのアドレスを知る必要があります。

ファクトリーコントラクトイベント

幸いにも、Uniswapのチームはこの点を考慮しており、V2のファクトリーコントラクトが新しいペアが作成されるたびにイベントを発行するようになっています。こちらがファクトリーコントラクトのSolidityコードの一部です:

function createPair(address tokenA, address tokenB) external returns (address pair) {
  // [...]
  // 新しいUniswapV2Pairコントラクトをデプロイし、そのアドレスを`pair`に格納
  assembly {
      pair := create2(0, add(bytecode, 32), mload(bytecode), salt)
  }
  // [...]
  // PairCreatedイベントを発行
  emit PairCreated(token0, token1, pair, allPairs.length);
}

これを単純に取得するだけで、スマートコントラクトのPairCreatedイベントをフェッチできます。

イベントの読み取り これは、ファクトリーコントラクトのイベントを読み取るためのPython + web3.pyを使ったサンプルコードです:

from web3 import Web3

# ローカルノードに接続
w3 = Web3(Web3.HTTPProvider('https://mainnet.infura.io/v3/<YOUR_INFURA_PROJECT_ID>'))

# コントラクトアドレスを定義
contract_address = '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f' # Uniswap V2ファクトリー

# コントラクトABIを定義
factory_abi = [
  {
     "anonymous": False,
     "inputs": [
        {"indexed": True,"internalType": "address","name": "token0","type": "address"},
        {"indexed": True,"internalType": "address","name": "token1","type": "address"},
        {"indexed": False,"internalType": "address","name": "pair","type": "address"},
        {"indexed": False,"internalType": "uint256", "name": "","type": "uint256"}
     ],
     "name": "PairCreated",
     "type": "event"
  }
]

# コントラクトをインスタンス化
factory_contract = w3.eth.contract(address=contract_address, abi=factory_abi)

# コントラクトからイベントを取得
events = factory_contract.events.PairCreated().createFilter(fromBlock='0x0', toBlock='latest').get_all_entries()
print(f'Found {len(events)} events')

Infuraの無料ノードの制限により、このコードは1万件以上のイベントが返された場合には動作しません。これは問題です。ファクトリーコントラクトは20万件以上のイベントを発行しているためです。

これを解決するには、クエリを失敗したときに再帰的に小さなサブクエリに分割し、結果をマージする関数を実装できます。

# 再帰的なイベント取得関数
def getEventsRecursive(contract, _from, _to):
  # Infuraは1度に多すぎるブロック(1万)をクエリするとエラーを投げるので、そうなったらクエリを2つのサブクエリに分割する。
  try:
    events = contract.events.PairCreated().createFilter(fromBlock=_from, toBlock=_to).get_all_entries()
    print("Found ", len(events), " events between blocks ", _from, " and ", _to)
    return events
  except ValueError: 
    print("Too many events found between blocks ", _from, " and ", _to)
    midBlock = (_from + _to) // 2
    return getEventsRecursive(contract, _from, midBlock) + getEventsRecursive(contract, midBlock + 1, _to)            

この関数は次のように呼び出すことができます:

events = getEventsRecursive(factory_contract, 0, w3.eth.blockNumber)

これはほぼ20万件のイベントを返します。この多くのペアは価値のないトークンのものであるため、後でフィルタリングが必要です。

この関数の実行には約5分かかります! 良いニュースは、この関数を頻繁に実行する必要はないことです。24時間ごとにバックグラウンドで1回実行すれば十分です。

V2 互換の DEX

Uniswap V2ファクトリーのペアを読み取る方法がわかったので、他のV2互換DEXのペアを読み取るのは簡単です。ほとんどのV2互換DEXは、ファクトリーコントラクトを含むUniswap V2のコードの大部分をそのままコピーしています。

他のDEXのペアをフェッチするには、ファクトリーコントラクトアドレスとABIを変更するだけです。たとえばSushiSwap:

[...]
contract_address = '0xC0AEe478e3658e2610c5F7A4A2E1777cE9e4f2Ac' # SushiSwapファクトリー メインネット
[...]
events = factory_contract.events.PairCreated().createFilter(fromBlock='0x0', toBlock='latest').get_all_entries() print(f'Found {len(events)} events')

これは5秒で次の出力を返します:

Found 3692 events

Uniswapに比べてペア数ははるかに少ないことがわかります。 再帰的な二分探索クエリは不要でした。

その他のDEXのファクトリーアドレス

{
"uniV2": {
"uniswapV2": {
"factory": "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f"
},
"sushiswapV2": {
"factory": "0xC0AEe478e3658e2610c5F7A4A2E1777cE9e4f2Ac"
},
"shibaswapV2": {
"factory": "0x115934131916C8b277DD010Ee02de363c09d037c"
},
"croswapV2": {
"factory": "0x9DEB29c9a4c7A88a3C0257393b7f3335338D9A9D"
},
"convergenceV2": {
"factory": "0x4eef5746ED22A2fD368629C1852365bf5dcb79f1"
},
[...]
}
}

リザーブの読み取り

追跡する大量のペアリストが得られたので、それらのトークンリザーブを読み取る必要があります。残念ながら、リザーブはファクトリーコントラクトには格納されておらず、ペアコントラクト自体に格納されています。つまり、各ペアコントラクトのgetReserves()関数を効率的に呼び出す方法が必要です。

ヘルパーコントラクト

何千ものRPCコールを実行する代わりに、事前に自分自身でデプロイした特別なスマートコントラクトに1つの大規模な呼び出しを行うことができます。

呼び出しでペアアドレスの配列を提供し、コントラクト実行中に、それ自体が各ペアコントラクトのgetReserves()関数を呼び出し、結果を配列で返します。これは、すべてのロジックがEVMのコンテキスト内にパックされるため、ネットワークのオーバーヘッドを回避でき、はるかに効率的です。

コントラクトは次のとおりです:

//SPDX-License-Identifier: UNLICENSED
pragma solidity ^0.8;

// ペアコントラクトのインターフェイスを定義、呼び出したい関数を含める
interface IUniswapV2Pair {
  function getReserves() external view returns (uint112 reserve0, uint112 reserve1, uint32 blockTimestampLast);
}

// バッチクエリコントラクト  
contract FlashBotsUniswapQuery {

  function getReservesByPairs(IUniswapV2Pair[] calldata _pairs) external view returns (uint256[3][] memory) {
    
    uint256[3][] memory result = new uint256[3][](_pairs.length);
    
    for (uint i = 0; i < _pairs.length; i++) {
       (result[i][0], result[i][1], result[i][2]) = _pairs[i].getReserves();
    }
    
    return result;
  }

}

このスマートコントラクトは、Flashbotsのシンプルなアービトラージボットリポジトリにほぼ同じ形式で見つかります。

アセンブリで最適化されたバージョンは記事の最後にありますが、速度向上は重要ではなく、アセンブリ版は平坦化されたリザーブ配列を返す代わりに、Solidityバージョンの2次元配列を返します。

このコントラクトはメインネットにデプロイする必要があり(Remixなどを使用)、アドレスは以下のコードのqueryContractAddressとして保存する必要があります。

このコントラクトの使用は非常に簡単です。呼び出しコードは次のとおりです:

#  PairCreatedイベントのリストを取得
# [...]
events = getEventsRecursive()

# イベントを辞書のリストに変換
pairDataList = [
  {
     'token0': e['args']['token0'],
     'token1': e['args']['token1'],
     'pair': e['args']['pair']
  } for e in events]

# Web3pyを使用してデプロイしたコントラクトのgetReservesByPairs()を呼び出す
queryContractAddress = "0x6c618c74235c70DF9F6AD47c6b5E9c8D3876432B" 

queryAbi = [
  {
     "inputs": [
        {"internalType": "contract IUniswapV2Pair[]","name": "_pairs","type": "address[]"},
     ],
     "name": "getReservesByPairs",
     "outputs": [
       {"internalType": "uint256[3][]", "name": "","type": "uint256[3][]"}
     ],
     "stateMutability": "view","type": "function"
  }
]

# コントラクトオブジェクトを作成
queryContract = w3.eth.contract(address=queryContractAddress, abi=queryAbi)

# ペア辞書をアドレスリストに整形
pairAddrList = [p['pair'] for p in pairDataList]

# 次のコードは20万件のUniswap V2ペアでは失敗する
reserves = queryContract.functions.getReservesByPairs(pairAddrList).call()

このスクリプトは、次のような結果を返す必要があります:

[[156362, 7, 1637535630], [1108183983306794770482713, 187818480, 1682759063],...]

アドレス0x6c618c74235c70DF9F6AD47c6b5E9c8D3876432Bは、Remixで個人的にデプロイしたコントラクトのアドレスです。ガス料金を支払うことなく、独自のコントラクトのデプロイを回避したい場合はそれを使用できます。

このスクリプトは、ペア数がはるかに少ないUniswap V2の小さなフォークでのみ、as-isで機能する可能性が高いことに注意してください。これは、実行コントラクト呼び出しの期間制限があるためです。Infuraの場合、その制限は5秒の実行です。

この制限がなくても、ペアのリザーブを取得するのに5秒もかかる遅延は長すぎます。 イーサリアムでは12秒ごとに新しいブロックが生成されます。 ノードプロバイダーの遅延、最終トランザクションバンドル送信の遅延、および後の記事で説明する機会検索アルゴリズムに必要な計算時間を考慮すると、この部分を最適化するための対策が必要です。

リクエストの最適化

非同期リクエスト

遅延/タイムアウトの問題を解決する簡単な方法は、ノードにフィードするペアアドレスリストを小さなチャンクにスライスし、並列して非同期でノードに送信することです。

最新のWeb3pyバージョンを使用すると、ノードへのリクエストを行うときに、コードの大幅な変更なしに非同期プログラミングを使用できます。 これを行う方法は次のとおりです:

# 追加のインポート
import asyncio
from web3 import AsyncHTTPProvider 
from web3.eth import AsyncEth
# Jupyter notebookを使う場合はnest_asyncio.apply()などの少しの変更が必要

# 非同期のリザーブ取得関数を定義
async def getReservesAsync(pairDataList, chunkSize=1000):

  # 非同期Web3プロバイダーを作成
  w3Async = Web3(AsyncHTTPProvider(NODE_URI), modules={'eth': (AsyncEth)}) 
  
  # コントラクトオブジェクトを作成
  queryContract = w3Async.eth.contract(address=queryContractAddress, abi=queryAbi)

  # ペアアドレスのチャンクリストを作成
  chunks = [[pair["pair"] for pair in pairDataList[i:i + chunkSize]] for i in range(0, len(pairDataList), chunkSize)]

  # 非同期タスクを作成
  tasks = [queryContract.functions.getReservesByPairs(pairs).call() for pairs in chunks]
  
  # タスクを並列に実行
  results = await asyncio.gather(*tasks)

  # 結果を返す
  return results

# 関数を呼び出す  
reserves = asyncio.run(getReservesAsync(pairDataList))

このコードは、ペアごとにリザーブの大きなリストを返す必要があります。結果は次のようになります:

[[reserve0, reserve1, timestamp], [...], ...]

このコードは非常にうまく機能しますが、単一ノードへの最大同時リクエスト数や24時間リクエスト制限など、他のAPIレート制限がヒットします。

非同期リクエスト

これらの追加の制限を回避するには、このアプローチを新しいトリックで完了させる必要があります。それは、リクエスト負荷をバランスするために、複数のノードに並列してクエリを送信することです。

ノードはAsyncHTTPProviderオブジェクトでラップされます。 スクリプトの始めに行われる変更は次のとおりです:

# ノードURIの定義
NODE_URIS = [
  # Infuraノード
  "https://mainnet.infura.io/v3/INFURA_PROJECT_ID",
  "https://mainnet.infura.io/v3/INFURA_PROJECT_ID",
  "https://mainnet.infura.io/v3/INFURA_PROJECT_ID",

  # Alchemyノード
  "https://eth-mainnet.alchemyapi.io/v2/ALCHEMY_PROJECT_ID",

  # Quiknodeノード
  "https://eth-mainnet.quiknode.pro/QUICKNODE_PROJECT_ID/",

  #...
]

providerList = [Web3(AsyncHTTPProvider(uri), modules={'eth': (AsyncEth)}) for uri in NODE_URIS]

getReservesAsync()関数は、.call()リクエストを複数のノードに送信し、最初に受信した結果を返すように編集されています。

async def getReservesParallel(pairDataList, providers, chunkSize=1000):

  # コントラクトオブジェクトを作成
  contracts = [provider.eth.contract(address=queryContractAddress, abi=queryAbi) for provider in providers]

  # ペアアドレスのチャンクリストを作成
  chunks = [[pair["pair"] for pair in pairDataList[i:i + chunkSize]] for i in range(0, len(pairDataList), chunkSize)]

  # チャンクをプロバイダーにラウンドロビンで割り当てる
  tasks = [contracts[i % len(contracts)].functions.getReservesByPairs(pairs).call() for i, pairs in enumerate(chunks)]

  # タスクを並列に実行
  results = await asyncio.gather(*tasks)
  
  return results

# 関数を呼び出す
reserves = asyncio.run(getReservesParallel(pairDataList, providerList))

このコードは、NODE_URISリストに指定したノード間でリクエストを分割する必要があります。

ほとんどのプロバイダーは、複数のアカウントを作成することを望んでいないと思われるため、ノードを複数のプロバイダーに分散させるべきではなく、数十個以下のノードを作成する必要があります。

このコードには、web3pyライブラリを使用することによる追加コストも含まれています。これは多くのチェックと追加の操作を実行します。 このシリーズの後半で、ノードにJSON-RPCリクエストを手動で送信し、バイナリ結果を処理することにより時間を節約することができる、このコードのスリムバージョンが提供されます。

最後の注意点は、これらのスケーリング最適化の後でも、毎ブロック20万件以上のペアをクエリすることは実行不可能な場合があるということです。 幸いにも、これらのペアの大部分は価値のないシットコインを含んでおり、安全にフィルタリングできます。

ボーナス: アセンブリにおけるヘルパー コントラクトの最適化

次は、getReservesByPairs()関数のガスコストを削減するために、ヘルパーコントラクトをアセンブリで最適化する試みです:

//SPDX-License-Identifier: UNLICENSED
pragma solidity ^0.8;

contract UniswapFlashQuery {

  function getReservesByPairsAsm(address[] calldata _pairs) external view returns (bytes32[] memory) {
    
    // 配列は平坦化されている。アセンブリでは2次元配列は非常に複雑
    bytes32[] memory result = new bytes32[](_pairs.length * 3);

    assembly {
      // 戻り値のデータサイズ(reserve0, reserve1, blockTimestampLast)  
      let size := 0x60 // 3 * 32 bytes = 3 * 0x20 = 0x60 bytes

      // 関数セレクタのメモリを割り当てる
      let callData := mload(0x40)
      mstore(callData, 0x0902f1ac00000000000000000000000000000000000000000000000000000000)

      mstore(0x40, add(callData, 0x04)) // フリーメモリポインタを更新

      for { let i := 0 } lt(i, _pairs.length) { i := add(i, 1) } {
          
        // カルデータからペアアドレスをロード  
        let pair := calldataload(add(_pairs.offset, mul(i, 0x20)))
        
        // getReservesを呼び出し、結果をresult配列に書き込む
        let success := staticcall(gas(), pair, callData, 0x04, add(add(result, 0x20),mul(i, size)), size)
      }

      // フリーメモリポインタを更新  
      mstore(0x40, add(mload(0x40), mul(_pairs.length, size)))
    }

    return result;
  }

}

このコードはループ内の操作をできる限り少なくしようとしています。

CALL後の通常のif iszero(成功){リバート()}などのチェックは削除されており、ループ内に分岐が発生しないようにしています。

アセンブリをはるかに単純化できるように、戻り配列はもはや2次元配列ではなく、各ペアのリザーブが3つの連続スロットに格納された1次元のフラット配列です。 Pythonで同じ結果を取得するには、前のコントラクトと同じ結果を取得するために、軽量の処理が必要です:

# 呼び出しと平坦化
reserves = queryContract.functions.getReservesByPairsYul(pairs).call()

res = [int.from_bytes(elem, byteorder='big') for elem in res]

res = [res[i:i+3] for i in range(0, len(res), 3)]

残念ながらこのコードは以前のものよりもはるかに高速ではありません。トランザクション内で実行される代わりにeth_callで実行される場合、Solidityバージョンを使用するよりもYulアセンブリバージョンを使用すると、ガス使用量は約10900Gから約8700Gに減少します(20%の削減)。 改善されたガス消費は、ノードがペアコントラクトのストレージデータをフェッチするときに費やす時間のほとんどはIO操作であるため、速度向上には繋がりません。 1000ペアを取得すると、クエリの完了にかかる時間は1.73秒から1.72秒に減少し、無視できる差しかありません。

とはいえ、提供されたアセンブリコードを理解することは、EVMの動作とガスコストの最適化方法を理解する上での良い練習です。このシリーズで何度もカバーしていくトピックです。

結論

この記事では、V2ファクトリーコントラクトのイベントを使用してペアのリストを取得する方法と、ペアコントラクトのgetReserves()関数を使用してこれらのペアのリザーブを取得する方法について説明しました。 平行化などの最適化により、リザーブの取得を12秒のブロック時間のごく一部で実行できるようになりました。

次の記事では、アービトラージ機会の発見方法について説明します。 トークンペアを迅速にフィルタリングし、アービトラージ機会の最適なサイジングの式を導出します

Subscribe to ikaika
Receive the latest updates directly to your inbox.
Mint this entry as an NFT to add it to your collection.
Verification
This entry has been permanently stored onchain and signed by its creator.