Skip to content

Sync v1.1 (non-breaking changes) #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions src/millipds/repo_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import cbrrr

from atmst.blockstore import OverlayBlockStore, MemoryBlockStore
from atmst.blockstore import OverlayBlockStore, MemoryBlockStore, BlockStore
from atmst.mst.node_store import NodeStore
from atmst.mst.node_wrangler import NodeWrangler
from atmst.mst.node_walker import NodeWalker
Expand Down Expand Up @@ -101,6 +101,24 @@ def get_record(db: Database, did: str, path: str) -> Optional[bytes]:
)


# TODO: consider moving this into atmst
# used for figuring out the blocks needed for inductive proofs
class LoggingBlockStoreWrapper(BlockStore):
def __init__(self, bs: BlockStore):
self.bs = bs
self.gets = set()

def put_block(self, key: bytes, value: bytes) -> None:
self.bs.put_block(key, value)

def get_block(self, key: bytes) -> bytes:
self.gets.add(key)
return self.bs.get_block(key)

def del_block(self, key: bytes) -> None:
self.bs.del_block(key)


# This is perhaps the most complex function in the whole codebase.
# There's probably some scope for refactoring, but I like the "directness" of it.
# The work it does is inherently complex, i.e. the atproto MST record commit logic
Expand Down Expand Up @@ -243,6 +261,7 @@ def apply_writes(
{
"cid": delta.later_value,
"path": delta.path,
"prev": delta.prior_value,
"action": "update",
}
)
Expand All @@ -265,7 +284,12 @@ def apply_writes(
)
)
firehose_ops.append(
{"cid": None, "path": delta.path, "action": "delete"}
{
"cid": None,
"path": delta.path,
"prev": delta.prior_value,
"action": "delete",
}
)
blob_decref_all(con, user_id, prior_value)
con.execute(
Expand All @@ -275,6 +299,25 @@ def apply_writes(
else:
raise Exception("unreachable")

# step 2b: compute inversion proof blocks
lbs = LoggingBlockStoreWrapper(bs)
lnw = NodeWrangler(
NodeStore(OverlayBlockStore(upper=MemoryBlockStore(), lower=lbs))
)
inversion_proof_root = next_commit_root
# TODO: what order should we use? see https://github.com/bluesky-social/proposals/issues/78
for op in firehose_ops[::-1]:
if op["action"] == "create":
inversion_proof_root = lnw.del_record(
inversion_proof_root, op["path"]
)
else:
inversion_proof_root = lnw.put_record(
inversion_proof_root, op["path"], op["prev"]
)
assert inversion_proof_root == prev_commit_root
inductive_proof_nodes = set(cbrrr.CID(cid) for cid in lbs.gets)

# step 3: persist MST changes (we have to do this *after* record_diff because it might need to read some old blocks from the db)
con.executemany(
"DELETE FROM mst WHERE repo=? AND cid=?",
Expand Down Expand Up @@ -312,7 +355,9 @@ def apply_writes(
car = io.BytesIO()
cw = util.CarWriter(car, commit_cid)
cw.write_block(commit_cid, commit_bytes)
for mst_cid in created | deletion_proof_cids:
# NOTE: in my testing, inductive_proof_nodes is always a superset of deletion_proof_cids,
# so we could probably avoid computing deletion_proof_cids explicitly.
for mst_cid in created | deletion_proof_cids | inductive_proof_nodes:
cw.write_block(mst_cid, bs.get_block(bytes(mst_cid)))
for record_cid in new_record_cids:
cw.write_block(record_cid, record_cbors[record_cid])
Expand All @@ -333,6 +378,7 @@ def apply_writes(
"commit": commit_cid,
"rebase": False, # deprecated but still required
"tooBig": False, # TODO: actually check lol
"prevData": prev_commit_root,
}
firehose_bytes = cbrrr.encode_dag_cbor(
{"t": "#commit", "op": 1}
Expand Down