概念
Vyper的核心理念安全性&可审计性&简洁,它设计之初就将安全性放在首位,通过强制执行一系列严格的语言特性来最大限度地降低漏洞风险。比如Vyper严格禁止递归调用,只通过限制外部调用和状态修改的顺序来确保合约执行的原子性,以及Vyper强制使用定点数运算而不是浮点数来消除了浮点数精度问题可能导致的安全隐患,另外还排除了许多复杂的和可能引入漏洞的语言特性,像是继承、修饰符、无限循环和汇编。
Vyper每次横向比较都是Solidity,后者虽然灵活但是也有更高的风险,Vyper则是通过牺牲一部分灵活性来换取安全性,强制开发者编写更简洁和更可预测的代码,所以Vyper 的功能集相对有限,不适合实现所有复杂的合约逻辑。
特色设计有诸如不支持类继承,合约之间无法通过继承共享代码,而是必须通过组合的方式来构建功能来避免了继承链过长带来的风险,以及强制要求所有变量在声明时明确指定其可见性并且不允许隐式类型转换。Vyper最适合对安全性要求极高的金融类合约和需要频繁审计且逻辑相对简单的合约。
因为有点Solidity的基础所以不需要从零开始,只需要理解两者的差异。
Vyper不支持类继承,在 Solidity中我习惯用继承来复用代码, Vyper这边是需要更多地采用composition的方式将功能模块化并在合约内部进行调用,没法通过继承链来传递。
Solidity的修饰符可以方便地在函数执行前后添加检查或逻辑,Vyper摒弃了修饰符,因为认为它会模糊函数的实际行为,我需要在函数内部显式地使用 assert 或 require 语句来执行条件检查。 Vyper不允许同名但参数不同的函数,以及所有循环都必须是有界的且不支持递归调用,这是为了确保合约的gas消耗始终可预测从而有效防止DoS攻击,那么我就需要重新思考如何在没有这些功能的情况下实现循环逻辑(?)。
想要开发一个基于Vyper的真实合约就需要先明确功能,我一开始暂定了在基础功能上添加黑名单、增发销毁、锁仓和安全机制,后面加入了快照、无gas费授权、代币归属和锁仓、合约暂停和所有权与权限管理。
遇到的坑
就不赘述太多基础,留档记录一下遇到的几个坑qwq方便以后绕过和待优化
1. Vesting 释放计算的精度问题,在release函数里面的releasable_total的计算使用了 schedule.total_amount * elapsed / schedule.duration 。由于Vyper和EVM不支持浮点运算,所有的除法都是整数除法。会导致精度损失,特别是在releasable_total *计算中,如果 *schedule.total_amount * elapsed *不能被 schedule.duration *整除,就会向下取整。
这会导致实际释放的代币总量略略略略小于用户应得的总量,不太清楚如何处理这部分精度损失,是累积到合约中还是通过什么方式补偿?
我想的是通过* releasable_total: uint256 = schedule.total_amount * elapsed / schedule.duration 来解决,先乘后除的方式可以最大程度地保留精度,因为乘法会扩大数值让除法的结果更接近真实值,并且Vyper的整数除法会自动向下取整,宁愿少释放一点也不要多释放)。然后通过releasable: uint256 = releasable_total - schedule.released *计算出实际可释放的金额。
2. Re-entrancy Attack在release函数中涉及了从合约余额到用户余额的 _transfer_with_snapshot操作,如果外部调用在转账过程中再次调用release和其他修改合约状态的函数会导致意外的行为或资金损失。
这里是一个重入守卫机制,用 _lock() 和 _unlock() 内部函数来管理 self.unlocked 状态变量 。在 release 函数的开头调用 _lock() 断言 self.unlocked 为 True 并设置为 False,在函数结束时调用 _unlock() 将其重新设置为 True 。这样,在 _lock() 和 _unlock() 之间的任何对合约的外部调用都会因为 self.unlocked 为 False 而被阻止。
3. 快照功能是能够查询特定账户在过去某个快照ID时的余额,需要每次余额变动时记录快照并在查询时能高效地找到对应快照的值,暴力复制的方法太麻烦了,所以参考了其他项目的写时复制的逻辑。
不在调用snapshot()时去记录任何余额,只在账户余额即将发生变动的那一刻才去更新,_update_balance_snapshot的是核心核心,会在余额变动前被调用检查当前最新的快照ID是否大于该账户最后一次被记录快照时的ID。如果是才会将该账户当前的余额和新的快照ID作为一个 BalanceSnapshot结构体,然后追加到这个账户专属的动态数组中 。
想了一个反向查询balanceOfAt,查询历史余额时用的是反向遍历,它从一个账户的快照记录数组的末尾开始向前查找。它寻找的第一个ID小于或等于目标的记录就是那个时间点的正确余额。
4. 在合约的构造函数中需要动态地获取当前部署链的chainId来构建EIP-712的DOMAIN_SEPARATOR,转化原地打转挺久。
后续
下一步是单元测试合约中的每一个external和public函数,打算学学用 ape-vyper来编写测试用例,优化一下高Gas消耗的操作。
合约代码
# @version ^0.3.10
# @evm-version paris
# === Events ===
event Transfer:
sender: indexed(address)
receiver: indexed(address)
value: uint256
event Approval:
owner: indexed(address)
spender: indexed(address)
value: uint256
event OwnershipProposed:
new_owner: indexed(address)
effective_time: uint256
event OwnershipClaimed:
previous_owner: indexed(address)
new_owner: indexed(address)
event Mint:
to: indexed(address)
amount: uint256
event Burn:
sender: indexed(address)
amount: uint256
event Snapshot:
id: uint256
event Paused:
status: bool
event VestingScheduleCreated:
user: indexed(address)
schedule_id: indexed(uint256)
amount: uint256
start: uint256
duration: uint256
event VestingReleased:
user: indexed(address)
schedule_id: indexed(uint256)
amount: uint256
# === Structs ===
struct OwnerProposal:
new_owner: address
effective_time: uint256
struct VestingSchedule:
initialized: bool
total_amount: uint256
start_time: uint256
duration: uint256
released: uint256
struct BalanceSnapshot:
id: uint256
value: uint256
# === Constants ===
PERMIT_TYPEHASH: constant(bytes32) = keccak256("Permit(address owner,address spender,uint256 value,uint256 nonce,uint256 deadline)")
OWNERSHIP_DELAY: constant(uint256) = 86400 # 24 hours
MAX_SNAPSHOTS_PER_ACCOUNT: constant(uint256) = 1000
# === Token Config ===
name: public(String[32])
symbol: public(String[8])
decimals: public(uint256)
total_supply: public(uint256)
cap: public(uint256)
# === Token Accounting ===
balances: HashMap[address, uint256]
allowances: HashMap[address, HashMap[address, uint256]]
next_schedule_id: public(uint256)
vesting: HashMap[address, HashMap[uint256, VestingSchedule]]
# === State Variables for Advanced Features ===
DOMAIN_SEPARATOR: public(bytes32)
nonces: public(HashMap[address, uint256])
owner: public(address)
owner_proposal: public(OwnerProposal)
unlocked: bool
account_snapshots: HashMap[address, DynArray[BalanceSnapshot, MAX_SNAPSHOTS_PER_ACCOUNT]]
snapshot_count: public(uint256)
# === Internal Helper Functions ===
@internal
def _only_owner():
assert msg.sender == self.owner, "Auth: sender is not owner"
# === Re-entrancy Guard ===
@internal
def _lock():
assert self.unlocked, "ReentrancyGuard: reentrant call"
self.unlocked = False
@internal
def _unlock():
self.unlocked = True
# === Timelock for Ownership ===
@external
def proposeOwnership(new_owner: address):
self._only_owner()
assert new_owner != empty(address), "Auth: new owner is zero address"
effective_time: uint256 = block.timestamp + OWNERSHIP_DELAY
self.owner_proposal = OwnerProposal({
new_owner: new_owner,
effective_time: effective_time
})
log OwnershipProposed(new_owner, effective_time)
@external
def claimOwnership():
proposal: OwnerProposal = self.owner_proposal
assert msg.sender == proposal.new_owner, "Auth: not proposed owner"
assert block.timestamp >= proposal.effective_time, "Auth: timelock not passed"
prev_owner: address = self.owner
self.owner = proposal.new_owner
self.owner_proposal = empty(OwnerProposal)
log OwnershipClaimed(prev_owner, self.owner)
# === Pause Functionality ===
paused: public(bool)
@external
def pause():
self._only_owner()
self.paused = True
log Paused(True)
@external
def unpause():
self._only_owner()
self.paused = False
log Paused(False)
@internal
def _not_paused():
assert not self.paused, "State: contract paused"
# === Blacklist ===
blacklist: HashMap[address, bool]
@external
def addToBlacklist(addr: address):
self._only_owner()
self.blacklist[addr] = True
@external
def removeFromBlacklist(addr: address):
self._only_owner()
self.blacklist[addr] = False
@internal
def _not_blacklisted(addr: address):
assert not self.blacklist[addr], "Auth: address is blacklisted"
# === Snapshotting ===
@internal
def _update_balance_snapshot(account: address):
count: uint256 = self.snapshot_count
if count == 0:
return
snapshots: DynArray[BalanceSnapshot, MAX_SNAPSHOTS_PER_ACCOUNT] = self.account_snapshots[account]
snap_len: uint256 = len(snapshots)
if snap_len == 0 or snapshots[snap_len - 1].id < count:
snapshots.append(BalanceSnapshot({
id: count,
value: self.balances[account]
}))
self.account_snapshots[account] = snapshots
@external
def snapshot() -> uint256:
self._only_owner()
self.snapshot_count += 1
log Snapshot(self.snapshot_count)
return self.snapshot_count
@view
@external
def balanceOfAt(account: address, snapshot_id: uint256) -> uint256:
assert snapshot_id > 0 and snapshot_id <= self.snapshot_count, "Snapshot: invalid id"
snapshots: DynArray[BalanceSnapshot, MAX_SNAPSHOTS_PER_ACCOUNT] = self.account_snapshots[account]
snap_len: uint256 = len(snapshots)
if snap_len == 0:
return 0
# FINAL FIX: Loop with a constant bound and break early.
for i in range(MAX_SNAPSHOTS_PER_ACCOUNT):
if i >= snap_len:
break
snap: BalanceSnapshot = snapshots[(snap_len - 1) - i]
if snap.id <= snapshot_id:
return snap.value
return 0
# === Internal Core Logic with Snapshotting ===
@internal
def _transfer_with_snapshot(_from: address, _to: address, _amount: uint256):
self._update_balance_snapshot(_from)
self._update_balance_snapshot(_to)
from_balance: uint256 = self.balances[_from]
assert from_balance >= _amount, "ERC20: insufficient balance"
self.balances[_from] = from_balance - _amount
self.balances[_to] += _amount
log Transfer(_from, _to, _amount)
@internal
def _mint_with_snapshot(_to: address, _amount: uint256):
self._update_balance_snapshot(empty(address))
self._update_balance_snapshot(_to)
assert self.total_supply + _amount <= self.cap, "ERC20: cap exceeded"
self.balances[_to] += _amount
self.total_supply += _amount
log Mint(_to, _amount)
log Transfer(empty(address), _to, _amount)
@internal
def _burn_with_snapshot(_from: address, _amount: uint256):
self._update_balance_snapshot(_from)
self._update_balance_snapshot(empty(address))
from_balance: uint256 = self.balances[_from]
assert from_balance >= _amount, "ERC20: insufficient balance"
self.balances[_from] = from_balance - _amount
self.total_supply -= _amount
log Burn(_from, _amount)
log Transfer(_from, empty(address), _amount)
# === Constructor ===
@external
def __init__(_name: String[32], _symbol: String[8], _decimals: uint256, _initial_supply: uint256, _cap: uint256):
self.name = _name
self.symbol = _symbol
self.decimals = _decimals
assert _cap >= _initial_supply, "ERC20: cap < initial supply"
self.cap = _cap
self.owner = msg.sender
self.unlocked = True
self.snapshot_count = 0
# FIXED: use _abi_encode
domain_hash: bytes32 = keccak256(
_abi_encode(
keccak256("EIP712Domain(string name,string version,uint256 chainId,address verifyingContract)"),
keccak256(_name),
keccak256("1"),
convert(chain.id, bytes32),
convert(self, bytes32)
)
)
self.DOMAIN_SEPARATOR = domain_hash
if _initial_supply > 0:
self._mint_with_snapshot(msg.sender, _initial_supply)
# === ERC20 Mutable Functions ===
@external
def transfer(to: address, amount: uint256) -> bool:
assert to != empty(address), "ERC20: transfer to the zero address"
self._not_paused()
self._not_blacklisted(msg.sender)
self._not_blacklisted(to)
self._transfer_with_snapshot(msg.sender, to, amount)
return True
@external
def transferFrom(_from: address, to: address, amount: uint256) -> bool:
assert to != empty(address), "ERC20: transfer to the zero address"
self._not_paused()
self._not_blacklisted(msg.sender)
self._not_blacklisted(_from)
self._not_blacklisted(to)
allowed: uint256 = self.allowances[_from][msg.sender]
assert allowed >= amount, "ERC20: allowance exceeded"
if allowed != max_value(uint256):
self.allowances[_from][msg.sender] = allowed - amount
self._transfer_with_snapshot(_from, to, amount)
return True
# === Mint & Burn ===
@external
def mint(to: address, amount: uint256):
self._only_owner()
self._not_paused()
self._mint_with_snapshot(to, amount)
@external
def burn(amount: uint256):
self._not_paused()
self._burn_with_snapshot(msg.sender, amount)
# === Vesting ===
@external
def createVestingSchedule(user: address, amount: uint256, start: uint256, duration: uint256) -> uint256:
self._only_owner()
assert amount > 0, "Vesting: amount must be > 0"
assert duration > 0, "Vesting: duration must be > 0"
self._transfer_with_snapshot(msg.sender, self, amount)
schedule_id: uint256 = self.next_schedule_id
self.vesting[user][schedule_id] = VestingSchedule({
initialized: True,
total_amount: amount,
start_time: start,
duration: duration,
released: 0
})
self.next_schedule_id += 1
log VestingScheduleCreated(user, schedule_id, amount, start, duration)
return schedule_id
@external
def release(user: address, schedule_id: uint256):
self._lock()
self._not_paused()
schedule: VestingSchedule = self.vesting[user][schedule_id]
assert schedule.initialized, "Vesting: schedule does not exist"
assert block.timestamp >= schedule.start_time, "Vesting: not started yet"
elapsed: uint256 = block.timestamp - schedule.start_time
if elapsed > schedule.duration:
elapsed = schedule.duration
releasable_total: uint256 = schedule.total_amount * elapsed / schedule.duration
releasable: uint256 = releasable_total - schedule.released
assert releasable > 0, "Vesting: nothing to release"
self.vesting[user][schedule_id].released += releasable
self._transfer_with_snapshot(self, user, releasable)
log VestingReleased(user, schedule_id, releasable)
self._unlock()
# === EIP-712 Permit Function ===
@internal
@view
def _build_permit_hash(_owner: address, _spender: address, _value: uint256, _nonce: uint256, _deadline: uint256) -> bytes32:
# FIXED: use _abi_encode
permit_hash: bytes32 = keccak256(
_abi_encode(
PERMIT_TYPEHASH,
_owner,
_spender,
_value,
_nonce,
_deadline
)
)
return keccak256(concat(b"\x19\x01", self.DOMAIN_SEPARATOR, permit_hash))
@external
def permit(_owner: address, spender: address, _value: uint256, deadline: uint256, v: uint8, r: bytes32, s: bytes32):
assert block.timestamp <= deadline, "Permit: expired deadline"
current_nonce: uint256 = self.nonces[_owner]
digest: bytes32 = self._build_permit_hash(_owner, spender, _value, current_nonce, deadline)
signer: address = ecrecover(digest, v, r, s)
assert signer != empty(address), "Permit: invalid signature"
assert signer == _owner, "Permit: invalid owner"
self.allowances[_owner][spender] = _value
self.nonces[_owner] = current_nonce + 1
log Approval(_owner, spender, _value)
# === Other ERC20 Functions ===
@external
def approve(spender: address, amount: uint256) -> bool:
self._not_paused()
self._not_blacklisted(msg.sender)
self._not_blacklisted(spender)
self.allowances[msg.sender][spender] = amount
log Approval(msg.sender, spender, amount)
return True
@external
def increaseAllowance(spender: address, added_value: uint256) -> bool:
self._not_paused()
self._not_blacklisted(msg.sender)
self._not_blacklisted(spender)
current_allowance: uint256 = self.allowances[msg.sender][spender]
new_allowance: uint256 = current_allowance + added_value
self.allowances[msg.sender][spender] = new_allowance
log Approval(msg.sender, spender, new_allowance)
return True
@external
def decreaseAllowance(spender: address, subtracted_value: uint256) -> bool:
self._not_paused()
self._not_blacklisted(msg.sender)
self._not_blacklisted(spender)
current_allowance: uint256 = self.allowances[msg.sender][spender]
assert current_allowance >= subtracted_value, "ERC20: allowance is lower"
new_allowance: uint256 = current_allowance - subtracted_value
self.allowances[msg.sender][spender] = new_allowance
log Approval(msg.sender, spender, new_allowance)
return True
# === View functions ===
@external
@view
def balanceOf(account: address) -> uint256:
return self.balances[account]
@external
@view
def allowance(owner: address, spender: address) -> uint256:
return self.allowances[owner][spender]