@@ -1464,20 +1464,58 @@ struct local_cache {
1464
1464
system_keyspace::bootstrap_state _state;
1465
1465
};
1466
1466
1467
+ future<> system_keyspace::peers_table_read_fixup () {
1468
+ assert (this_shard_id () == 0 );
1469
+ if (_peers_table_read_fixup_done) {
1470
+ co_return ;
1471
+ }
1472
+ _peers_table_read_fixup_done = true ;
1473
+
1474
+ const auto cql = format (" SELECT peer, host_id, WRITETIME(host_id) as ts from system.{}" , PEERS);
1475
+ std::unordered_map<utils::UUID, std::pair<net::inet_address, int64_t >> map{};
1476
+ const auto cql_result = co_await execute_cql (cql);
1477
+ for (const auto & row : *cql_result) {
1478
+ const auto peer = row.get_as <net::inet_address>(" peer" );
1479
+ if (!row.has (" host_id" )) {
1480
+ slogger.error (" Peer {} has no host_id in system.{}, the record is broken, removing it" ,
1481
+ peer, system_keyspace::PEERS);
1482
+ co_await remove_endpoint (gms::inet_address{peer});
1483
+ continue ;
1484
+ }
1485
+ const auto host_id = row.get_as <utils::UUID>(" host_id" );
1486
+ const auto ts = row.get_as <int64_t >(" ts" );
1487
+ const auto it = map.find (host_id);
1488
+ if (it == map.end ()) {
1489
+ map.insert ({host_id, {peer, ts}});
1490
+ continue ;
1491
+ }
1492
+ if (it->second .second >= ts) {
1493
+ slogger.error (" Peer {} with host_id {} has newer IP {} in system.{}, the record is stale, removing it" ,
1494
+ peer, host_id, it->second .first , system_keyspace::PEERS);
1495
+ co_await remove_endpoint (gms::inet_address{peer});
1496
+ } else {
1497
+ slogger.error (" Peer {} with host_id {} has newer IP {} in system.{}, the record is stale, removing it" ,
1498
+ it->second .first , host_id, peer, system_keyspace::PEERS);
1499
+ co_await remove_endpoint (gms::inet_address{it->second .first });
1500
+ it->second = {peer, ts};
1501
+ }
1502
+ }
1503
+ }
1504
+
1467
1505
future<std::unordered_map<gms::inet_address, locator::endpoint_dc_rack>> system_keyspace::load_dc_rack_info () {
1468
- auto msg = co_await execute_cql (format (" SELECT peer, data_center, rack from system.{}" , PEERS));
1506
+ co_await peers_table_read_fixup ();
1507
+
1508
+ const auto msg = co_await execute_cql (format (" SELECT peer, data_center, rack from system.{}" , PEERS));
1469
1509
1470
1510
std::unordered_map<gms::inet_address, locator::endpoint_dc_rack> ret;
1471
1511
for (const auto & row : *msg) {
1472
- net::inet_address peer = row.template get_as <net::inet_address>(" peer" );
1473
1512
if (!row.has (" data_center" ) || !row.has (" rack" )) {
1474
1513
continue ;
1475
1514
}
1476
- gms::inet_address gms_addr (std::move (peer));
1477
- sstring dc = row.template get_as <sstring>(" data_center" );
1478
- sstring rack = row.template get_as <sstring>(" rack" );
1479
-
1480
- ret.emplace (gms_addr, locator::endpoint_dc_rack{ dc, rack });
1515
+ ret.emplace (row.get_as <net::inet_address>(" peer" ), locator::endpoint_dc_rack {
1516
+ row.get_as <sstring>(" data_center" ),
1517
+ row.get_as <sstring>(" rack" )
1518
+ });
1481
1519
}
1482
1520
1483
1521
co_return ret;
@@ -1735,114 +1773,98 @@ static std::vector<cdc::generation_id_v2> decode_cdc_generations_ids(const set_t
1735
1773
return gen_ids_list;
1736
1774
}
1737
1775
1738
- static locator::host_id get_host_id (const gms::inet_address& peer, const cql3::untyped_result_set::row& row) {
1739
- locator::host_id host_id;
1740
- if (row.has (" host_id" )) {
1741
- host_id = locator::host_id (row.get_as <utils::UUID>(" host_id" ));
1742
- }
1743
- if (!host_id) {
1744
- slogger.warn (" Peer {} has no host_id in system.{}" , peer, system_keyspace::PEERS);
1745
- }
1746
- return host_id;
1747
- }
1748
-
1749
1776
future<std::unordered_map<gms::inet_address, std::unordered_set<dht::token>>> system_keyspace::load_tokens () {
1750
- sstring req = format (" SELECT peer, host_id, tokens FROM system.{}" , PEERS);
1751
- return execute_cql (req).then ([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
1752
- std::unordered_map<gms::inet_address, std::unordered_set<dht::token>> ret;
1753
- for (auto & row : *cql_result) {
1754
- auto peer = gms::inet_address (row.get_as <net::inet_address>(" peer" ));
1755
- if (!get_host_id (peer, row)) {
1756
- continue ;
1757
- }
1758
- if (row.has (" tokens" )) {
1759
- ret.emplace (peer, decode_tokens (deserialize_set_column (*peers (), row, " tokens" )));
1760
- }
1777
+ co_await peers_table_read_fixup ();
1778
+
1779
+ const sstring req = format (" SELECT peer, tokens FROM system.{}" , PEERS);
1780
+ std::unordered_map<gms::inet_address, std::unordered_set<dht::token>> ret;
1781
+ const auto cql_result = co_await execute_cql (req);
1782
+ for (const auto & row : *cql_result) {
1783
+ if (row.has (" tokens" )) {
1784
+ ret.emplace (gms::inet_address (row.get_as <net::inet_address>(" peer" )),
1785
+ decode_tokens (deserialize_set_column (*peers (), row, " tokens" )));
1761
1786
}
1762
- return ret;
1763
- }) ;
1787
+ }
1788
+ co_return ret ;
1764
1789
}
1765
1790
1766
1791
future<std::unordered_map<gms::inet_address, locator::host_id>> system_keyspace::load_host_ids () {
1767
- sstring req = format (" SELECT peer, host_id FROM system.{}" , PEERS);
1768
- return execute_cql (req).then ([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
1769
- std::unordered_map<gms::inet_address, locator::host_id> ret;
1770
- for (auto & row : *cql_result) {
1771
- auto peer = gms::inet_address (row.get_as <net::inet_address>(" peer" ));
1772
- if (auto host_id = get_host_id (peer, row)) {
1773
- ret.emplace (peer, host_id);
1774
- }
1775
- }
1776
- return ret;
1777
- });
1792
+ co_await peers_table_read_fixup ();
1793
+
1794
+ const sstring req = format (" SELECT peer, host_id FROM system.{}" , PEERS);
1795
+ std::unordered_map<gms::inet_address, locator::host_id> ret;
1796
+ const auto cql_result = co_await execute_cql (req);
1797
+ for (const auto & row : *cql_result) {
1798
+ ret.emplace (gms::inet_address (row.get_as <net::inet_address>(" peer" )),
1799
+ locator::host_id (row.get_as <utils::UUID>(" host_id" )));
1800
+ }
1801
+ co_return ret;
1778
1802
}
1779
1803
1780
1804
future<std::vector<gms::inet_address>> system_keyspace::load_peers () {
1781
- auto res = co_await execute_cql (format (" SELECT peer, host_id, tokens FROM system.{}" , PEERS));
1805
+ co_await peers_table_read_fixup ();
1806
+
1807
+ const auto res = co_await execute_cql (format (" SELECT peer, tokens FROM system.{}" , PEERS));
1782
1808
assert (res);
1783
1809
1784
1810
std::vector<gms::inet_address> ret;
1785
- for (auto & row: *res) {
1786
- auto peer = gms::inet_address (row.get_as <net::inet_address>(" peer" ));
1787
- if (!get_host_id (peer, row)) {
1788
- continue ;
1789
- }
1811
+ for (const auto & row: *res) {
1790
1812
if (!row.has (" tokens" )) {
1791
1813
// Ignore rows that don't have tokens. Such rows may
1792
1814
// be introduced by code that persists parts of peer
1793
1815
// information (such as RAFT_ID) which may potentially
1794
1816
// race with deleting a peer (during node removal).
1795
1817
continue ;
1796
1818
}
1797
- ret.emplace_back (peer);
1819
+ ret.emplace_back (gms::inet_address (row. get_as <net::inet_address>( " peer" )) );
1798
1820
}
1799
1821
co_return ret;
1800
1822
}
1801
1823
1802
1824
future<std::unordered_map<gms::inet_address, sstring>> system_keyspace::load_peer_features () {
1803
- sstring req = format (" SELECT peer, supported_features FROM system.{}" , PEERS);
1804
- return execute_cql (req).then ([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
1805
- std::unordered_map<gms::inet_address, sstring> ret;
1806
- for (auto & row : *cql_result) {
1807
- if (row.has (" supported_features" )) {
1808
- ret.emplace (row.get_as <net::inet_address>(" peer" ),
1809
- row.get_as <sstring>(" supported_features" ));
1810
- }
1825
+ co_await peers_table_read_fixup ();
1826
+
1827
+ const sstring req = format (" SELECT peer, supported_features FROM system.{}" , PEERS);
1828
+ std::unordered_map<gms::inet_address, sstring> ret;
1829
+ const auto cql_result = co_await execute_cql (req);
1830
+ for (const auto & row : *cql_result) {
1831
+ if (row.has (" supported_features" )) {
1832
+ ret.emplace (row.get_as <net::inet_address>(" peer" ),
1833
+ row.get_as <sstring>(" supported_features" ));
1811
1834
}
1812
- return ret;
1813
- }) ;
1835
+ }
1836
+ co_return ret ;
1814
1837
}
1815
1838
1816
1839
future<std::unordered_map<gms::inet_address, gms::inet_address>> system_keyspace::get_preferred_ips () {
1817
- sstring req = format (" SELECT peer, preferred_ip FROM system.{}" , PEERS);
1818
- return execute_cql (req).then ([] (::shared_ptr<cql3::untyped_result_set> cql_res_set) {
1819
- std::unordered_map<gms::inet_address, gms::inet_address> res;
1820
-
1821
- for (auto & r : *cql_res_set) {
1822
- if (r.has (" preferred_ip" )) {
1823
- res.emplace (gms::inet_address (r.get_as <net::inet_address>(" peer" )),
1824
- gms::inet_address (r.get_as <net::inet_address>(" preferred_ip" )));
1825
- }
1840
+ co_await peers_table_read_fixup ();
1841
+
1842
+ const sstring req = format (" SELECT peer, preferred_ip FROM system.{}" , PEERS);
1843
+ std::unordered_map<gms::inet_address, gms::inet_address> res;
1844
+
1845
+ const auto cql_result = co_await execute_cql (req);
1846
+ for (const auto & r : *cql_result) {
1847
+ if (r.has (" preferred_ip" )) {
1848
+ res.emplace (gms::inet_address (r.get_as <net::inet_address>(" peer" )),
1849
+ gms::inet_address (r.get_as <net::inet_address>(" preferred_ip" )));
1826
1850
}
1851
+ }
1827
1852
1828
- return res;
1829
- });
1853
+ co_return res;
1830
1854
}
1831
1855
1832
1856
namespace {
1833
1857
template <typename T>
1834
- static data_value_or_unset make_data_value_or_unset (const std::optional<T>& opt, bool & any_set ) {
1858
+ static data_value_or_unset make_data_value_or_unset (const std::optional<T>& opt) {
1835
1859
if (opt) {
1836
- any_set = true ;
1837
1860
return data_value (*opt);
1838
1861
} else {
1839
1862
return unset_value{};
1840
1863
}
1841
1864
};
1842
1865
1843
- static data_value_or_unset make_data_value_or_unset (const std::optional<std::unordered_set<dht::token>>& opt, bool & any_set ) {
1866
+ static data_value_or_unset make_data_value_or_unset (const std::optional<std::unordered_set<dht::token>>& opt) {
1844
1867
if (opt) {
1845
- any_set = true ;
1846
1868
auto set_type = set_type_impl::get_instance (utf8_type, true );
1847
1869
return make_set_value (set_type, prepare_tokens (*opt));
1848
1870
} else {
@@ -1851,29 +1873,30 @@ static data_value_or_unset make_data_value_or_unset(const std::optional<std::uno
1851
1873
};
1852
1874
}
1853
1875
1854
- future<> system_keyspace::update_peer_info (gms::inet_address ep, const peer_info& info) {
1876
+ future<> system_keyspace::update_peer_info (gms::inet_address ep, locator::host_id hid, const peer_info& info) {
1877
+ if (ep == gms::inet_address{}) {
1878
+ on_internal_error (slogger, format (" update_peer_info called with empty inet_address, host_id {}" , hid));
1879
+ }
1880
+ if (!hid) {
1881
+ on_internal_error (slogger, format (" update_peer_info called with empty host_id, ep {}" , ep));
1882
+ }
1855
1883
if (_db.get_token_metadata ().get_topology ().is_me (ep)) {
1856
1884
on_internal_error (slogger, format (" update_peer_info called for this node: {}" , ep));
1857
1885
}
1858
1886
1859
- bool any_set = false ;
1860
1887
data_value_list values = {
1861
1888
data_value_or_unset (data_value (ep.addr ())),
1862
- make_data_value_or_unset (info.data_center , any_set ),
1863
- make_data_value_or_unset (info. host_id , any_set ),
1864
- make_data_value_or_unset (info.preferred_ip , any_set ),
1865
- make_data_value_or_unset (info.rack , any_set ),
1866
- make_data_value_or_unset (info.release_version , any_set ),
1867
- make_data_value_or_unset (info.rpc_address , any_set ),
1868
- make_data_value_or_unset (info.schema_version , any_set ),
1869
- make_data_value_or_unset (info.tokens , any_set ),
1870
- make_data_value_or_unset (info.supported_features , any_set ),
1889
+ make_data_value_or_unset (info.data_center ),
1890
+ data_value_or_unset (hid. id ),
1891
+ make_data_value_or_unset (info.preferred_ip ),
1892
+ make_data_value_or_unset (info.rack ),
1893
+ make_data_value_or_unset (info.release_version ),
1894
+ make_data_value_or_unset (info.rpc_address ),
1895
+ make_data_value_or_unset (info.schema_version ),
1896
+ make_data_value_or_unset (info.tokens ),
1897
+ make_data_value_or_unset (info.supported_features ),
1871
1898
};
1872
1899
1873
- if (!any_set) {
1874
- co_return ;
1875
- }
1876
-
1877
1900
auto query = fmt::format (" INSERT INTO system.{} "
1878
1901
" (peer,data_center,host_id,preferred_ip,rack,release_version,rpc_address,schema_version,tokens,supported_features) VALUES"
1879
1902
" (?,?,?,?,?,?,?,?,?,?)" , PEERS);
@@ -1928,7 +1951,7 @@ future<> system_keyspace::update_schema_version(table_schema_version version) {
1928
1951
* Remove stored tokens being used by another node
1929
1952
*/
1930
1953
future<> system_keyspace::remove_endpoint (gms::inet_address ep) {
1931
- sstring req = format (" DELETE FROM system.{} WHERE peer = ?" , PEERS);
1954
+ const sstring req = format (" DELETE FROM system.{} WHERE peer = ?" , PEERS);
1932
1955
slogger.debug (" DELETE FROM system.{} WHERE peer = {}" , PEERS, ep);
1933
1956
co_await execute_cql (req, ep.addr ()).discard_result ();
1934
1957
}
0 commit comments