-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_commit_reveal.py
197 lines (165 loc) · 5.94 KB
/
test_commit_reveal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import pytest
import time
from bittensor_commit_reveal import get_encrypted_commit
SUBTENSOR_PULSE_DELAY = 24
PERIOD = 3 # Drand period in seconds
GENESIS_TIME = 1692803367
def test_get_encrypted_commits():
uids = [1, 2]
weights = [11, 22]
version_key = 50
tempo = 100
current_block = 1000
netuid = 1
reveal_period = 2
block_time = 12
start_time = int(time.time())
ct_pybytes, reveal_round = get_encrypted_commit(
uids,
weights,
version_key,
tempo,
current_block,
netuid,
reveal_period,
block_time,
)
# Basic checks
assert (
ct_pybytes is not None and len(ct_pybytes) > 0
), "Ciphertext should not be empty"
assert reveal_round > 0, "Reveal round should be positive"
expected_reveal_round, _, _ = compute_expected_reveal_round(
start_time, tempo, current_block, netuid, reveal_period, block_time
)
# The reveal_round should be close to what we predict
assert (
abs(reveal_round - expected_reveal_round) <= 1
), f"Reveal round {reveal_round} not close to expected {expected_reveal_round}"
def test_generate_commit_success():
uids = [1, 2, 3]
values = [10, 20, 30]
version_key = 42
tempo = 50
current_block = 500
netuid = 100
subnet_reveal_period_epochs = 2
block_time = 12
start_time = int(time.time())
ct_pybytes, reveal_round = get_encrypted_commit(
uids,
values,
version_key,
tempo,
current_block,
netuid,
subnet_reveal_period_epochs,
block_time,
)
assert (
ct_pybytes is not None and len(ct_pybytes) > 0
), "Ciphertext should not be empty"
assert reveal_round > 0, "Reveal round should be positive"
expected_reveal_round, expected_reveal_time, time_until_reveal = (
compute_expected_reveal_round(
start_time,
tempo,
current_block,
netuid,
subnet_reveal_period_epochs,
block_time,
)
)
assert (
abs(reveal_round - expected_reveal_round) <= 1
), f"Reveal round {reveal_round} differs from expected {expected_reveal_round}"
required_lead_time = SUBTENSOR_PULSE_DELAY * PERIOD
computed_reveal_time = (
GENESIS_TIME + (reveal_round + SUBTENSOR_PULSE_DELAY) * PERIOD
)
assert computed_reveal_time - start_time >= required_lead_time, (
"Not enough lead time before reveal. "
f"computed_reveal_time={computed_reveal_time}, start_time={start_time}, required={required_lead_time}"
)
assert (
time_until_reveal >= SUBTENSOR_PULSE_DELAY * PERIOD
), f"time_until_reveal {time_until_reveal} is less than required {SUBTENSOR_PULSE_DELAY * PERIOD}"
@pytest.mark.asyncio
async def test_generate_commit_various_tempos():
NETUID = 1
CURRENT_BLOCK = 100_000
SUBNET_REVEAL_PERIOD_EPOCHS = 1
BLOCK_TIME = 6
TEMPOS = [10, 50, 100, 250, 360, 500, 750, 1000]
uids = [0]
values = [100]
version_key = 1
for tempo in TEMPOS:
start_time = int(time.time())
ct_pybytes, reveal_round = get_encrypted_commit(
uids,
values,
version_key,
tempo,
CURRENT_BLOCK,
NETUID,
SUBNET_REVEAL_PERIOD_EPOCHS,
BLOCK_TIME,
)
assert len(ct_pybytes) > 0, f"Ciphertext is empty for tempo {tempo}"
assert reveal_round > 0, f"Reveal round is zero or negative for tempo {tempo}"
expected_reveal_round, _, time_until_reveal = compute_expected_reveal_round(
start_time,
tempo,
CURRENT_BLOCK,
NETUID,
SUBNET_REVEAL_PERIOD_EPOCHS,
BLOCK_TIME,
)
assert (
abs(reveal_round - expected_reveal_round) <= 1
), f"Tempo {tempo}: reveal_round {reveal_round} not close to expected {expected_reveal_round}"
computed_reveal_time = (
GENESIS_TIME + (reveal_round + SUBTENSOR_PULSE_DELAY) * PERIOD
)
required_lead_time = SUBTENSOR_PULSE_DELAY * PERIOD
assert computed_reveal_time - start_time >= required_lead_time, (
f"Tempo {tempo}: Not enough lead time: reveal_time={computed_reveal_time}, "
f"start_time={start_time}, required={required_lead_time}"
)
assert (
time_until_reveal >= SUBTENSOR_PULSE_DELAY * PERIOD
), f"Tempo {tempo}: time_until_reveal {time_until_reveal} is less than required {SUBTENSOR_PULSE_DELAY * PERIOD}"
def compute_expected_reveal_round(
now: int,
tempo: int,
current_block: int,
netuid: int,
subnet_reveal_period_epochs: int,
block_time: int,
):
tempo_plus_one = tempo + 1
netuid_plus_one = netuid + 1
block_with_offset = current_block + netuid_plus_one
current_epoch = block_with_offset // tempo_plus_one
# Initial guess for reveal_epoch
reveal_epoch = current_epoch + subnet_reveal_period_epochs
reveal_block_number = reveal_epoch * tempo_plus_one - netuid_plus_one
# Compute blocks_until_reveal, ensure non-negative
blocks_until_reveal = reveal_block_number - current_block
if blocks_until_reveal < 0:
blocks_until_reveal = 0
time_until_reveal = blocks_until_reveal * block_time
# Adjust until we have enough lead time (at least SUBTENSOR_PULSE_DELAY pulses * period seconds)
while time_until_reveal < SUBTENSOR_PULSE_DELAY * PERIOD:
reveal_epoch += 1
reveal_block_number = reveal_epoch * tempo_plus_one - netuid_plus_one
blocks_until_reveal = reveal_block_number - current_block
if blocks_until_reveal < 0:
blocks_until_reveal = 0
time_until_reveal = blocks_until_reveal * block_time
reveal_time = now + time_until_reveal
reveal_round = (
(reveal_time - GENESIS_TIME + PERIOD - 1) // PERIOD
) - SUBTENSOR_PULSE_DELAY
return reveal_round, reveal_time, time_until_reveal