- Akash Kumar Shrivastva (as18464)
- Rishav Roy (rr4577)
This project tries to simulate Replicated Concurrency Control and Recovery in a Database System. The objective is to implement a distributed database system with concurreny control and fault tolerance through concurrent transaction processing, data replication and site failure simulations.
We leverage the following algorithms to achieve the objectives:
- Serializable Snapshot Isolation Algorithm for Concurrency Control and validation at commit time.
- Available Copies Algorithm for Fault Tolerance and Recover
For a concise output, run:
python driver.py <input_file>
For a more verbose output, run:
python driver.py -v <input_file>
For running the program over a directory of input files, run:
./run.sh <input_directory> <output_directory>
Driver: The distributed database system is built around four key components that work together to ensure reliable and consistent data operations. At the highest level, the Driver serves as the system's main controller and entry point, responsible for processing input commands and coordinating the overall flow of operations. It acts as an orchestrator, interpreting user instructions and directing them to appropriate components.
Transaction Manager: The Transaction Manager acts as the core processing engine for all transaction-related operations. It maintains complete control over transaction lifecycles, from initiation through to either commitment or abortion. This component implements concurrency control mechanisms through conflict detection and maintains transaction states.
Site Manager: Working closely with the Transaction Manager, the Site Manager operates as the central coordinator for all distributed operations across multiple database sites. It maintains a comprehensive view of the entire distributed system, tracking the health and availability of each site and managing data distribution. When sites fail or recover, the Site Manager handles the necessary adjustments, including managing pending operations and ensuring data consistency across replicated sites.
Site: At the lowest level, each Site represents an individual database node that handles actual data storage and operations. Sites implement multi-version concurrency control and manage local read and write operations on their stored data. Each site maintains detailed version histories of its data items and provides snapshot isolation capabilities to ensure consistent reads.
The following UML diagram is represents the components and data models used in the application:
class Driver:
The Driver class serves as the main interface for the transaction processing system.
It handles file input and delegates instructions to the TransactionManager(begin, read, write, end) and the SiteManager(fail, recover).
def __init__(self, verbose: bool = False):
Initialize Driver with optional verbose mode for detailed logging
verbose (bool): Enable/disable detailed logging
self.site_manager: SiteManager
self.transaction_manager: TransactionManager
self.verbose: bool = verbose
def read_file(self, file):
Read and process instructions from an input file
Each line should contain a transaction operation
file: Input file containing transaction instructions
def process_line(self, line: str, timestamp: int):
Process a single instruction line from the input
line (str): The instruction to process
timestamp (int): Current line number
class TransactionManager:
Manages transaction operations, conflict detection, and transaction lifecycle.
Coordinates with SiteManager for data operations across multiple sites.
def __init__(self):
Initialize TransactionManager with required components:
- site_manager: Reference to the SiteManager instance
- transaction_map: Maps transaction IDs to Transaction objects
- conflict_graph: Tracks conflicts between transactions for serializable snapshot isolation
self.site_manager: SiteManager
self.transaction_map: Dict[str, Transaction]
self.conflict_graph: Dict[str, Dict[EdgeType, Set[str]]]
def begin(self, t_id: str, timestamp: int):
Start a new transaction
t_id (str): Transaction identifier
timestamp (int): Start timestamp
def read(self, t_id: str, data_id: str, timestamp: int, is_pending_read: bool = False) -> int:
Execute a read operation for a transaction
t_id (str): Transaction identifier
data_id (str): Data item to read
timestamp (int): Operation timestamp
is_pending_read (bool): Whether this is a pending read operation
int: Value read from the data item
def write(self, t_id: str, data_id: str, value: int, timestamp: int, is_pending: bool = False):
Execute a write operation for a transaction
t_id (str): Transaction identifier
data_id (str): Data item to write
value (int): Value to write
timestamp (int): Operation timestamp
is_pending (bool): Whether this is a pending write operation
def clears_site_failure_check(self, t_id: str) -> bool:
Check for transaction ABORT based on Available Copies
t_id (str): Transaction identifier
bool: True if transaction can proceed
def clears_first_committer_rule_check(self, t_id: str) -> bool:
Check if transaction satisfies the first-committer-wins rule of serializable snapshot isolation
t_id (str): Transaction identifier
bool: True if rule is satisfied
def abort_transaction(self, abort_type: AbortType, t_id: str, data_id: str, site_id: int):
Abort a transaction with specified reason
abort_type (AbortType): Reason for abortion
t_id (str): Transaction identifier
data_id (str): Related data item
site_id (int): Related site ID
def end(self, t_id: str, timestamp: int):
Performs various checks before committing/aborting a transaction.
t_id (str): Transaction identifier
timestamp (int): End timestamp
def exec_pending(self, site_id: int, timestamp: int):
Execute pending operations(read, write) after site recovery
site_id (int): Site identifier
timestamp (int): Current timestamp
class SiteManager:
Manages multiple sites in the distributed database system.
Handles site failures, recovery, and data distribution.
def __init__(self):
Initialize SiteManager with:
- sites: Dictionary mapping site IDs to Site objects
- site_status: Status information for each site
- pending_reads/writes: Track pending operations during site failures
- data_locations: Track which data items are stored at which sites
self.sites: Dict[int, Site]
self.site_status: Dict[int, SiteStatus]
self.pending_reads: Dict[str, Set[Tuple[int, str, int]]]
self.pending_writes: Dict[str, Set[Tuple[int, str, int]]]
self.data_locations: Dict[str, List[int]]
def get_available_sites(self, data_id: str) -> List[int]:
Get list of available sites containing the specified data item
data_id (str): Data item identifier
List[int]: List of available site IDs
def get_site(self, site_id: int) -> Site:
Get a specific site instance
site_id (int): Site identifier
Site: Site instance
def commit(self, transaction: Transaction, timestamp: int):
Commit transaction changes to all relevant sites
transaction (Transaction): Transaction to commit
timestamp (int): Commit timestamp
def fail(self, site_id: int, timestamp: int):
Handle site failure
site_id (int): Failed site identifier
timestamp (int): Failure timestamp
def recover(self, site_id: int, timestamp: int):
Handle site recovery
site_id (int): Recovered site identifier
timestamp (int): Recovery timestamp
def dump(self):
Output the current state of all sites for debugging
class Site:
Represents a single database site in the distributed system.
Manages data storage, versioning, and operations for its local data items.
def __init__(self, site_id: int):
Initialize a database site
site_id (int): Unique identifier for the site
site_id (int): Site identifier
data_store (Dict[str, List[DataLog]]): Current data versions
data_history (Dict[str, List[DataLog]]): Historical data versions
self.site_id: int = site_id
self.data_store: Dict[str, List[DataLog]] = {}
self.data_history: Dict[str, List[DataLog]] = {}
def initialize_site_data(self):
Initialize the site with its designated data items.
Each site is responsible for certain data items based on its site_id.
def get_value_using_snapshot_isolation(self, data_id: str, timestamp: int) -> int:
Retrieve data value using snapshot isolation
data_id (str): Identifier of the data item
timestamp (int): Timestamp for snapshot isolation
int: Value of the data item at the specified timestamp
def read(self, data_id: str, timestamp: int) -> int:
Read a data item's value using snapshot isolation by calling self.get_value_using_snapshot_isolation.
data_id (str): Identifier of the data item
timestamp (int): Transaction start time
int: Value of the data item
def write(self, t_id: str, data_id: str, value: int, timestamp: int):
Write a new value to a data item
t_id (str): Transaction identifier
data_id (str): Identifier of the data item
value (int): Value to write
timestamp (int): Write timestamp
def persist(self, t_id: str, data_id: str, timestamp: int):
Persist a written value to the data store
t_id (str): Transaction identifier
data_id (str): Identifier of the data item
timestamp (int): Commit timestamp
def dump(self):
Output the current state of all data items at this site
Used for debugging and monitoring