看了getbalance和getblock,对leveldb存的内容有了大致的印象,下面就是怎么往里面存这些数据了。最基础的,发送一笔交易
//命令
eth.sendTransaction({from:'0xc57998cd5e4ad8bd82f7f6495771edb438d7452c' , to: '0xe16b36be71d89641bd2bae440b348bdcd93f127c', value: web3.toWei(1,"ether")})
//入口
func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args TransactionArgs) (common.Hash, error) {
// Look up the wallet containing the requested signer
account := accounts.Account{Address: args.from()}
wallet, err := s.b.AccountManager().Find(account)
if err != nil {
return common.Hash{}, err
}
if args.Nonce == nil {
// Hold the addresse's mutex around signing to prevent concurrent assignment of
// the same nonce to multiple accounts.
s.nonceLock.LockAddr(args.from())
defer s.nonceLock.UnlockAddr(args.from())
}
// Set some sanity defaults and terminate on failure
if err := args.setDefaults(ctx, s.b); err != nil {
return common.Hash{}, err
}
// Assemble the transaction and sign with the wallet
// 打包请求参数,封装成tx
tx := args.toTransaction()
//签名
signed, err := wallet.SignTx(account, tx, s.b.ChainConfig().ChainID)
if err != nil {
return common.Hash{}, err
}
//处理交易
return SubmitTransaction(ctx, s.b, signed)
}
//使用规定的规范进行签名,我的版本是伦敦升级版本
func SignTx(tx *Transaction, s Signer, prv *ecdsa.PrivateKey) (*Transaction, error) {
//1.对交易进行哈希
h := s.Hash(tx)
//2.生成签名,里面就看不懂了,总之就是算出R,S,V,在放入tx中
sig, err := crypto.Sign(h[:], prv)
if err != nil {
return nil, err
}
//3.将签名数据填充到Tx信息中
return tx.WithSignature(s, sig)
}
type txLookup struct {
slots int
lock sync.RWMutex
locals map[common.Hash]*types.Transaction
remotes map[common.Hash]*types.Transaction
}
//前面跳过了一些可以不用关心的步骤
func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
// If the transaction is already known, discard it
//获取交易hash并以此判断交易池中是否已存在该笔交易
hash := tx.Hash()
if pool.all.Get(hash) != nil {
log.Trace("Discarding already known transaction", "hash", hash)
knownTxMeter.Mark(1)
return false, ErrAlreadyKnown
}
// Make the local flag. If it's from local source or it's from the network but
// the sender is marked as local previously, treat it as the local transaction.
isLocal := local || pool.locals.containsTx(tx)
// If the transaction fails basic validation, discard it
// 验证交易合法性
if err := pool.validateTx(tx, isLocal); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxMeter.Mark(1)
return false, err
}
// If the transaction pool is full, discard underpriced transactions
// 如果交易池已满,按priced数组中gas price较低的交易剔除
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
return false, ErrUnderpriced
}
// New transaction is better than our worse ones, make room for it.
// If it's a local transaction, forcibly discard all available transactions.
// Otherwise if we can't make enough room for new one, abort the operation.
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)
// Special case, we still can't make the room for the new remote one.
if !isLocal && !success {
log.Trace("Discarding overflown transaction", "hash", hash)
overflowedTxMeter.Mark(1)
return false, ErrTxPoolOverflow
}
// Kick out the underpriced remote transactions.
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
pool.removeTx(tx.Hash(), false)
}
}
// Try to replace an existing transaction in the pending pool
// 如果交易已经存在于pending列表,比较新旧交易gasPrice的差值是否超过PriceBump
// 若超过则使用新交易代替旧交易
from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pendingDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
}
pool.all.Add(tx, isLocal)
pool.priced.Put(tx, isLocal)
pool.journalTx(from, tx)
pool.queueTxEvent(tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
// Successful promotion, bump the heartbeat
pool.beats[from] = time.Now()
return old != nil, nil
}
// New transaction isn't replacing a pending one, push into queue
// 将交易添加到queue队列
replaced, err = pool.enqueueTx(hash, tx, isLocal, true)
if err != nil {
return false, err
}
// Mark local addresses and journal local transactions
// 判断是否本地交易,保证本地交易优先被加入到TxPool
if local && !pool.locals.contains(from) {
log.Info("Setting new local account", "address", from)
pool.locals.add(from)
pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
}
if isLocal {
localGauge.Inc(1)
}
pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replaced, nil
}
//验证tx是否合法
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Accept only legacy transactions until EIP-2718/2930 activates.
if !pool.eip2718 && tx.Type() != types.LegacyTxType {
return ErrTxTypeNotSupported
}
// Reject dynamic fee transactions until EIP-1559 activates.
if !pool.eip1559 && tx.Type() == types.DynamicFeeTxType {
return ErrTxTypeNotSupported
}
// Reject transactions over defined size to prevent DOS attacks
if uint64(tx.Size()) > txMaxSize {
return ErrOversizedData
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
if tx.Value().Sign() < 0 {
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas.
if pool.currentMaxGas < tx.Gas() {
return ErrGasLimit
}
// Sanity check for extremely large numbers
if tx.GasFeeCap().BitLen() > 256 {
return ErrFeeCapVeryHigh
}
if tx.GasTipCap().BitLen() > 256 {
return ErrTipVeryHigh
}
// Ensure gasFeeCap is greater than or equal to gasTipCap.
if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 {
return ErrTipAboveFeeCap
}
// Make sure the transaction is signed properly.
from, err := types.Sender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
}
// Drop non-local transactions under our own minimal accepted gas price or tip
if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 {
return ErrUnderpriced
}
// Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(from) > tx.Nonce() {
return ErrNonceTooLow
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
// Ensure the transaction has more gas than the basic tx fee.
intrGas, err := IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul)
if err != nil {
return err
}
if tx.Gas() < intrGas {
return ErrIntrinsicGas
}
return nil
}
//将交易添加到queue队列,这个队列是代表暂时不可处理的交易队列
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) {
// Try to insert the transaction into the future queue
//这个account是否有tx list,没有的话创建一个
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil {
pool.queue[from] = newTxList(false)
}
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
queuedDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
queuedReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the queued counter
queuedGauge.Inc(1)
}
// If the transaction isn't in lookup set but it's expected to be there,
// show the error log.
//all队列是否有这笔交易
if pool.all.Get(hash) == nil && !addAll {
log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
}
if addAll {
//将这个tx添加到all队列
pool.all.Add(tx, local)
pool.priced.Put(tx, local)
}
// If we never record the heartbeat, do it right now.
if _, exist := pool.beats[from]; !exist {
pool.beats[from] = time.Now()
}
return old != nil, nil
}
func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction
// Iterate over all accounts and promote any executable transactions
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
// 1.1丢弃交易nonce值 < 账户当前nonce的交易
forwards := list.Forward(pool.currentState.GetNonce(addr))
for _, tx := range forwards {
hash := tx.Hash()
pool.all.Remove(hash)
}
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
// 1.2.丢弃账户余额不足的
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
}
log.Trace("Removed unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))
// Gather all executable transactions and promote them
// 3.将交易添加到pending列表
readies := list.Ready(pool.pendingNonces.get(addr))
for _, tx := range readies {
hash := tx.Hash()
//广播tx
if pool.promoteTx(addr, hash, tx) {
promoted = append(promoted, tx)
}
}
log.Trace("Promoted queued transactions", "count", len(promoted))
queuedGauge.Dec(int64(len(readies)))
// Drop all transactions over the allowed limit
var caps types.Transactions
if !pool.locals.contains(addr) {
caps = list.Cap(int(pool.config.AccountQueue))
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
}
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
delete(pool.beats, addr)
}
}
return promoted
}