ETH源码学习(4)sendTransaction
May 12th, 2022

看了getbalance和getblock,对leveldb存的内容有了大致的印象,下面就是怎么往里面存这些数据了。最基础的,发送一笔交易

简单总结流程:

  1. 将from打包成钱包
  2. 将请求参数包装成tx
  3. 签名
  4. 验证签名
  5. 验证交易是否合法
  6. 加入queue前的一些校验
  7. 加入pool的queue队列和all数组
  8. 加入pending前的一些校验
  9. 加入pool的pending队列
  10. 广播
//命令
eth.sendTransaction({from:'0xc57998cd5e4ad8bd82f7f6495771edb438d7452c' , to: '0xe16b36be71d89641bd2bae440b348bdcd93f127c', value: web3.toWei(1,"ether")})
//入口
func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args TransactionArgs) (common.Hash, error) {
	// Look up the wallet containing the requested signer
	account := accounts.Account{Address: args.from()}

	wallet, err := s.b.AccountManager().Find(account)
	if err != nil {
		return common.Hash{}, err
	}

	if args.Nonce == nil {
		// Hold the addresse's mutex around signing to prevent concurrent assignment of
		// the same nonce to multiple accounts.
		s.nonceLock.LockAddr(args.from())
		defer s.nonceLock.UnlockAddr(args.from())
	}

	// Set some sanity defaults and terminate on failure
	if err := args.setDefaults(ctx, s.b); err != nil {
		return common.Hash{}, err
	}
	// Assemble the transaction and sign with the wallet
        // 打包请求参数,封装成tx
	tx := args.toTransaction()
        //签名
	signed, err := wallet.SignTx(account, tx, s.b.ChainConfig().ChainID)
	if err != nil {
		return common.Hash{}, err
	}
        //处理交易
	return SubmitTransaction(ctx, s.b, signed)
}

签名

//使用规定的规范进行签名,我的版本是伦敦升级版本
func SignTx(tx *Transaction, s Signer, prv *ecdsa.PrivateKey) (*Transaction, error) {
  //1.对交易进行哈希
   h := s.Hash(tx)
//2.生成签名,里面就看不懂了,总之就是算出R,S,V,在放入tx中
   sig, err := crypto.Sign(h[:], prv)
   if err != nil {
      return nil, err
   }
//3.将签名数据填充到Tx信息中
   return tx.WithSignature(s, sig)
}
type txLookup struct {
   slots   int
   lock    sync.RWMutex
   locals  map[common.Hash]*types.Transaction
   remotes map[common.Hash]*types.Transaction
}

交易加入pool

//前面跳过了一些可以不用关心的步骤
func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
	// If the transaction is already known, discard it
//获取交易hash并以此判断交易池中是否已存在该笔交易
	hash := tx.Hash()
	if pool.all.Get(hash) != nil {
		log.Trace("Discarding already known transaction", "hash", hash)
		knownTxMeter.Mark(1)
		return false, ErrAlreadyKnown
	}
	// Make the local flag. If it's from local source or it's from the network but
	// the sender is marked as local previously, treat it as the local transaction.
	isLocal := local || pool.locals.containsTx(tx)

	// If the transaction fails basic validation, discard it
// 验证交易合法性
	if err := pool.validateTx(tx, isLocal); err != nil {
		log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
		invalidTxMeter.Mark(1)
		return false, err
	}
	// If the transaction pool is full, discard underpriced transactions
// 如果交易池已满,按priced数组中gas price较低的交易剔除
	if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
		// If the new transaction is underpriced, don't accept it
		if !isLocal && pool.priced.Underpriced(tx) {
			log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
			underpricedTxMeter.Mark(1)
			return false, ErrUnderpriced
		}
		// New transaction is better than our worse ones, make room for it.
		// If it's a local transaction, forcibly discard all available transactions.
		// Otherwise if we can't make enough room for new one, abort the operation.
		drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)

		// Special case, we still can't make the room for the new remote one.
		if !isLocal && !success {
			log.Trace("Discarding overflown transaction", "hash", hash)
			overflowedTxMeter.Mark(1)
			return false, ErrTxPoolOverflow
		}
		// Kick out the underpriced remote transactions.
		for _, tx := range drop {
			log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
			underpricedTxMeter.Mark(1)
			pool.removeTx(tx.Hash(), false)
		}
	}
	// Try to replace an existing transaction in the pending pool
// 如果交易已经存在于pending列表,比较新旧交易gasPrice的差值是否超过PriceBump
    // 若超过则使用新交易代替旧交易
	from, _ := types.Sender(pool.signer, tx) // already validated
	if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
		// Nonce already pending, check if required price bump is met
		inserted, old := list.Add(tx, pool.config.PriceBump)
		if !inserted {
			pendingDiscardMeter.Mark(1)
			return false, ErrReplaceUnderpriced
		}
		// New transaction is better, replace old one
		if old != nil {
			pool.all.Remove(old.Hash())
			pool.priced.Removed(1)
			pendingReplaceMeter.Mark(1)
		}
		pool.all.Add(tx, isLocal)
		pool.priced.Put(tx, isLocal)
		pool.journalTx(from, tx)
		pool.queueTxEvent(tx)
		log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

		// Successful promotion, bump the heartbeat
		pool.beats[from] = time.Now()
		return old != nil, nil
	}
	// New transaction isn't replacing a pending one, push into queue
// 将交易添加到queue队列
	replaced, err = pool.enqueueTx(hash, tx, isLocal, true)
	if err != nil {
		return false, err
	}
	// Mark local addresses and journal local transactions
// 判断是否本地交易,保证本地交易优先被加入到TxPool
	if local && !pool.locals.contains(from) {
		log.Info("Setting new local account", "address", from)
		pool.locals.add(from)
		pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
	}
	if isLocal {
		localGauge.Inc(1)
	}
	pool.journalTx(from, tx)

	log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
	return replaced, nil
}
//验证tx是否合法
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
   // Accept only legacy transactions until EIP-2718/2930 activates.
   if !pool.eip2718 && tx.Type() != types.LegacyTxType {
      return ErrTxTypeNotSupported
   }
   // Reject dynamic fee transactions until EIP-1559 activates.
   if !pool.eip1559 && tx.Type() == types.DynamicFeeTxType {
      return ErrTxTypeNotSupported
   }
   // Reject transactions over defined size to prevent DOS attacks
   if uint64(tx.Size()) > txMaxSize {
      return ErrOversizedData
   }
   // Transactions can't be negative. This may never happen using RLP decoded
   // transactions but may occur if you create a transaction using the RPC.
   if tx.Value().Sign() < 0 {
      return ErrNegativeValue
   }
   // Ensure the transaction doesn't exceed the current block limit gas.
   if pool.currentMaxGas < tx.Gas() {
      return ErrGasLimit
   }
   // Sanity check for extremely large numbers
   if tx.GasFeeCap().BitLen() > 256 {
      return ErrFeeCapVeryHigh
   }
   if tx.GasTipCap().BitLen() > 256 {
      return ErrTipVeryHigh
   }
   // Ensure gasFeeCap is greater than or equal to gasTipCap.
   if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 {
      return ErrTipAboveFeeCap
   }
   // Make sure the transaction is signed properly.
   from, err := types.Sender(pool.signer, tx)
   if err != nil {
      return ErrInvalidSender
   }
   // Drop non-local transactions under our own minimal accepted gas price or tip
   if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 {
      return ErrUnderpriced
   }
   // Ensure the transaction adheres to nonce ordering
   if pool.currentState.GetNonce(from) > tx.Nonce() {
      return ErrNonceTooLow
   }
   // Transactor should have enough funds to cover the costs
   // cost == V + GP * GL
   if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
      return ErrInsufficientFunds
   }
   // Ensure the transaction has more gas than the basic tx fee.
   intrGas, err := IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul)
   if err != nil {
      return err
   }
   if tx.Gas() < intrGas {
      return ErrIntrinsicGas
   }
   return nil
}
//将交易添加到queue队列,这个队列是代表暂时不可处理的交易队列
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) {
   // Try to insert the transaction into the future queue
  //这个account是否有tx list,没有的话创建一个 
  from, _ := types.Sender(pool.signer, tx) // already validated
   if pool.queue[from] == nil {
      pool.queue[from] = newTxList(false)
   }
   inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
   if !inserted {
      // An older transaction was better, discard this
      queuedDiscardMeter.Mark(1)
      return false, ErrReplaceUnderpriced
   }
   // Discard any previous transaction and mark this
   if old != nil {
      pool.all.Remove(old.Hash())
      pool.priced.Removed(1)
      queuedReplaceMeter.Mark(1)
   } else {
      // Nothing was replaced, bump the queued counter
      queuedGauge.Inc(1)
   }
   // If the transaction isn't in lookup set but it's expected to be there,
   // show the error log.
//all队列是否有这笔交易
   if pool.all.Get(hash) == nil && !addAll {
      log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
   }
   if addAll {
//将这个tx添加到all队列
      pool.all.Add(tx, local)
      pool.priced.Put(tx, local)
   }
   // If we never record the heartbeat, do it right now.
   if _, exist := pool.beats[from]; !exist {
      pool.beats[from] = time.Now()
   }
   return old != nil, nil
}
func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
   // Track the promoted transactions to broadcast them at once
   var promoted []*types.Transaction

   // Iterate over all accounts and promote any executable transactions
   for _, addr := range accounts {
      list := pool.queue[addr]
      if list == nil {
         continue // Just in case someone calls with a non existing account
      }
      // Drop all transactions that are deemed too old (low nonce)
// 1.1丢弃交易nonce值 < 账户当前nonce的交易
      forwards := list.Forward(pool.currentState.GetNonce(addr))
      for _, tx := range forwards {
         hash := tx.Hash()
         pool.all.Remove(hash)
      }
      log.Trace("Removed old queued transactions", "count", len(forwards))
      // Drop all transactions that are too costly (low balance or out of gas)
// 1.2.丢弃账户余额不足的
      drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
      for _, tx := range drops {
         hash := tx.Hash()
         pool.all.Remove(hash)
      }
      log.Trace("Removed unpayable queued transactions", "count", len(drops))
      queuedNofundsMeter.Mark(int64(len(drops)))

      // Gather all executable transactions and promote them
// 3.将交易添加到pending列表
      readies := list.Ready(pool.pendingNonces.get(addr))
      for _, tx := range readies {
         hash := tx.Hash()
         //广播tx
         if pool.promoteTx(addr, hash, tx) {
            promoted = append(promoted, tx)
         }
      }
      log.Trace("Promoted queued transactions", "count", len(promoted))
      queuedGauge.Dec(int64(len(readies)))

      // Drop all transactions over the allowed limit
      var caps types.Transactions
      if !pool.locals.contains(addr) {
         caps = list.Cap(int(pool.config.AccountQueue))
         for _, tx := range caps {
            hash := tx.Hash()
            pool.all.Remove(hash)
            log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
         }
         queuedRateLimitMeter.Mark(int64(len(caps)))
      }
      // Mark all the items dropped as removed
      pool.priced.Removed(len(forwards) + len(drops) + len(caps))
      queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
      if pool.locals.contains(addr) {
         localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
      }
      // Delete the entire queue entry if it became empty.
      if list.Empty() {
         delete(pool.queue, addr)
         delete(pool.beats, addr)
      }
   }
   return promoted
}
Subscribe to point
Receive the latest updates directly to your inbox.
Verification
This entry has been permanently stored onchain and signed by its creator.
More from point

Skeleton

Skeleton

Skeleton