Skip to content

Commit f187856

Browse files
committed
enhance: add resource group declarative api
Signed-off-by: chyezh <chyezh@outlook.com>
1 parent b9a10c9 commit f187856

13 files changed

+368
-60
lines changed

examples/resource_group.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from pymilvus import utility, connections, DEFAULT_RESOURCE_GROUP
1+
from pymilvus import utility, connections
2+
from pymilvus.client.constants import DEFAULT_RESOURCE_GROUP
23
from example import *
34

45
_HOST = '127.0.0.1'
@@ -52,7 +53,7 @@ def transfer_replica(source, target, collection_name, num_replica):
5253
f"transfer {num_replica} replicas in {collection_name} from {source} to {target}")
5354
utility.transfer_replica(
5455
source, target, collection_name, num_replica, using=_CONNECTION_NAME)
55-
56+
5657
def run():
5758
create_connection("root", "123456")
5859
coll = create_collection(_COLLECTION_NAME, _ID_FIELD_NAME, _VECTOR_FIELD_NAME)
+153
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
from pymilvus import utility, connections, Collection
2+
from pymilvus.client.constants import DEFAULT_RESOURCE_GROUP
3+
from pymilvus.client.types import ResourceGroupConfig
4+
from typing import List
5+
from example import create_connection, create_collection, insert, create_index
6+
7+
_PENDING_NODES_RESOURCE_GROUP="pending_nodes"
8+
# Vector parameters
9+
_DIM = 128
10+
_COLLECTION_NAME = 'rg_declarative_demo'
11+
_ID_FIELD_NAME = 'id_field'
12+
_VECTOR_FIELD_NAME = 'float_vector_field'
13+
14+
def create_example_collection_and_load(replica_number: int, resource_groups: List[str]):
15+
print(f"\nCreate collection and load...")
16+
coll = create_collection(_COLLECTION_NAME, _ID_FIELD_NAME, _VECTOR_FIELD_NAME)
17+
insert(coll, 10000, _DIM)
18+
coll.flush()
19+
create_index(coll, _VECTOR_FIELD_NAME)
20+
coll.load(replica_number=replica_number, _resource_groups=resource_groups)
21+
22+
def transfer_replica(src: str, dest: str, num_replica: int):
23+
utility.transfer_replica(source_group=src, target_group=dest, collection_name=_COLLECTION_NAME, num_replicas=num_replica)
24+
25+
def list_replica():
26+
coll = Collection(name=_COLLECTION_NAME)
27+
replicas = coll.get_replicas()
28+
print(replicas)
29+
30+
def init_cluster(node_num: int):
31+
print(f"Init cluster with {node_num} nodes, all nodes will be put in default resource group")
32+
# create a pending resource group, which can used to hold the pending nodes that do not hold any data.
33+
utility.create_resource_group(name=_PENDING_NODES_RESOURCE_GROUP, config=ResourceGroupConfig(
34+
requests={"node_num": 0}, # this resource group can hold 0 nodes, no data will be load on it.
35+
limits={"node_num": 10000}, # this resource group can hold at most 10000 nodes
36+
))
37+
38+
# create a default resource group, which can used to hold the nodes that all initial node in it.
39+
utility.update_resource_groups({
40+
DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(
41+
requests={"node_num": node_num},
42+
limits={"node_num": node_num},
43+
transfer_from=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover missing node from pending resource group at high priority.
44+
transfer_to=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover redundant node to pending resource group at low priority.
45+
)})
46+
47+
def list_all_resource_groups():
48+
rg_names = utility.list_resource_groups()
49+
50+
for rg_name in rg_names:
51+
resource_group = utility.describe_resource_group(rg_name)
52+
print(resource_group)
53+
# print(f"Resource group {rg_name} has {resource_group.nodes} with config: {resource_group.config}")
54+
55+
def scale_resource_group_to(name :str, node_num: int):
56+
"""scale resource group to node_num nodes, new query node need to be added from outside orchestration system"""
57+
utility.update_resource_groups({
58+
name: ResourceGroupConfig(
59+
requests={"node_num": node_num},
60+
limits={"node_num": node_num},
61+
transfer_from=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover missing node from pending resource group at high priority.
62+
transfer_to=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover redundant node to pending resource group at low priority.
63+
)
64+
})
65+
66+
def create_resource_group(name: str, node_num: int):
67+
print(f"Create resource group {name} with {node_num} nodes")
68+
utility.create_resource_group(name, config=ResourceGroupConfig(
69+
requests={"node_num": node_num},
70+
limits={"node_num": node_num},
71+
transfer_from=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover missing node from pending resource group at high priority.
72+
transfer_to=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover redundant node to pending resource group at low priority.
73+
))
74+
75+
def resource_group_management():
76+
# cluster is initialized with 1 node in default resource group, and 0 node in pending resource group.
77+
init_cluster(1)
78+
list_all_resource_groups()
79+
# DEFAULT_RESOURCE_GROUP: 1
80+
# _PENDING_NODES_RESOURCE_GROUP: 0
81+
82+
# rg1 missing two query node.
83+
# create_resource_group("rg1", 2)
84+
list_all_resource_groups()
85+
# DEFAULT_RESOURCE_GROUP: 1
86+
# _PENDING_NODES_RESOURCE_GROUP: 0
87+
# rg1: 0(missing 2)
88+
89+
# scale_out(2)
90+
# scale out two new query node into cluster by orchestration system, these node will be added to rg1 automatically.
91+
list_all_resource_groups()
92+
# DEFAULT_RESOURCE_GROUP: 1
93+
# _PENDING_NODES_RESOURCE_GROUP: 0
94+
# rg1: 2
95+
96+
97+
# rg1 missing one query node.
98+
scale_resource_group_to("rg1", 3)
99+
list_all_resource_groups()
100+
# DEFAULT_RESOURCE_GROUP: 1
101+
# _PENDING_NODES_RESOURCE_GROUP: 0
102+
# rg1: 2(missing 1)
103+
104+
# scale_out(2)
105+
# scale out two new query node into cluster by orchestration system, one node will be added to rg1 automatically
106+
# and one redundant node will be added to pending resource group.
107+
list_all_resource_groups()
108+
# DEFAULT_RESOURCE_GROUP: 1
109+
# _PENDING_NODES_RESOURCE_GROUP: 1
110+
# rg1: 3
111+
112+
scale_resource_group_to("rg1", 1)
113+
list_all_resource_groups()
114+
# DEFAULT_RESOURCE_GROUP: 1
115+
# _PENDING_NODES_RESOURCE_GROUP: 3
116+
# rg1: 1
117+
118+
# rg2 missing three query node, will be added from pending resource group.
119+
# create_resource_group("rg2", 3)
120+
list_all_resource_groups()
121+
# DEFAULT_RESOURCE_GROUP: 1
122+
# _PENDING_NODES_RESOURCE_GROUP: 0
123+
# rg1: 1
124+
# rg2: 3
125+
126+
scale_resource_group_to(DEFAULT_RESOURCE_GROUP, 5)
127+
list_all_resource_groups()
128+
# DEFAULT_RESOURCE_GROUP: 1(missing 4)
129+
# _PENDING_NODES_RESOURCE_GROUP: 0
130+
# rg1: 1
131+
# rg2: 3
132+
133+
# scale_out(4)
134+
list_all_resource_groups()
135+
# DEFAULT_RESOURCE_GROUP: 5
136+
# _PENDING_NODES_RESOURCE_GROUP: 1
137+
# rg1: 1
138+
# rg2: 3
139+
140+
def replica_management():
141+
# load collection into default.
142+
# create_example_collection_and_load(5, [DEFAULT_RESOURCE_GROUP])
143+
# one replica per node in default resource group.
144+
transfer_replica(DEFAULT_RESOURCE_GROUP, "rg1", 1)
145+
transfer_replica(DEFAULT_RESOURCE_GROUP, "rg2", 2)
146+
# DEFAULT_RESOURCE_GROUP: 2 replica on 5 nodes.
147+
# rg1: 1 replica on 1 node.
148+
# rg2: 2 replica on 3 nodes.
149+
150+
if __name__ == "__main__":
151+
create_connection()
152+
resource_group_management()
153+
replica_management()

pymilvus/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
from .orm.schema import CollectionSchema, FieldSchema
5858
from .orm.utility import (
5959
create_resource_group,
60+
update_resource_groups,
6061
create_user,
6162
delete_user,
6263
describe_resource_group,
@@ -126,6 +127,7 @@
126127
"MilvusUnavailableException",
127128
"BulkInsertState",
128129
"create_resource_group",
130+
"update_resource_groups",
129131
"drop_resource_group",
130132
"describe_resource_group",
131133
"list_resource_groups",

pymilvus/client/grpc_handler.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pathlib import Path
77
from typing import Any, Callable, Dict, List, Optional, Union
88
from urllib import parse
9+
from collections.abc import Mapping
910

1011
import grpc
1112
from grpc._cython import cygrpc
@@ -50,6 +51,7 @@
5051
Plan,
5152
Replica,
5253
ResourceGroupInfo,
54+
ResourceGroupConfig,
5355
RoleInfo,
5456
Shard,
5557
State,
@@ -1835,10 +1837,16 @@ def get_server_version(self, timeout: Optional[float] = None, **kwargs) -> str:
18351837

18361838
@retry_on_rpc_failure()
18371839
def create_resource_group(self, name: str, timeout: Optional[float] = None, **kwargs):
1838-
req = Prepare.create_resource_group(name)
1840+
req = Prepare.create_resource_group(name, **kwargs)
18391841
resp = self._stub.CreateResourceGroup(req, wait_for_ready=True, timeout=timeout)
18401842
check_status(resp)
18411843

1844+
@retry_on_rpc_failure()
1845+
def update_resource_groups(self, configs: Mapping[str, ResourceGroupConfig], timeout: Optional[float] = None, **kwargs):
1846+
req = Prepare.update_resource_groups(configs)
1847+
resp = self._stub.UpdateResourceGroups(req, wait_for_ready=True, timeout=timeout)
1848+
check_status(resp)
1849+
18421850
@retry_on_rpc_failure()
18431851
def drop_resource_group(self, name: str, timeout: Optional[float] = None, **kwargs):
18441852
req = Prepare.drop_resource_group(name)

pymilvus/client/prepare.py

+18-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import base64
22
import datetime
33
from typing import Any, Dict, Iterable, List, Optional, Union
4+
from collections.abc import Mapping
45

56
import ujson
67

@@ -19,7 +20,12 @@
1920
ITERATOR_FIELD,
2021
REDUCE_STOP_FOR_BEST,
2122
)
22-
from .types import DataType, PlaceholderType, get_consistency_level
23+
from .types import (
24+
DataType,
25+
PlaceholderType,
26+
ResourceGroupConfig,
27+
get_consistency_level,
28+
)
2329
from .utils import traverse_info, traverse_rows_info
2430

2531

@@ -1151,9 +1157,18 @@ def get_server_version(cls):
11511157
return milvus_types.GetVersionRequest()
11521158

11531159
@classmethod
1154-
def create_resource_group(cls, name: str):
1160+
def create_resource_group(cls, name: str, **kwargs):
11551161
check_pass_param(resource_group_name=name)
1156-
return milvus_types.CreateResourceGroupRequest(resource_group=name)
1162+
return milvus_types.CreateResourceGroupRequest(
1163+
resource_group=name,
1164+
config=kwargs.get("config"),
1165+
)
1166+
1167+
@classmethod
1168+
def update_resource_groups(cls, configs: Mapping[str, ResourceGroupConfig]):
1169+
return milvus_types.UpdateResourceGroupsRequest(
1170+
resource_groups=configs,
1171+
)
11571172

11581173
@classmethod
11591174
def drop_resource_group(cls, name: str):

pymilvus/client/stub.py

+11
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from pymilvus.decorators import deprecated
44
from pymilvus.exceptions import MilvusException, ParamError
55
from pymilvus.settings import Config
6+
from collections.abc import Mapping
67

78
from .check import is_legal_host, is_legal_port
89
from .grpc_handler import GrpcHandler
@@ -12,6 +13,7 @@
1213
CompactionState,
1314
Replica,
1415
ResourceGroupInfo,
16+
ResourceGroupConfig,
1517
)
1618

1719

@@ -1380,6 +1382,15 @@ def create_resource_group(self, name, timeout=None, **kwargs):
13801382
"""
13811383
with self._connection() as handler:
13821384
handler.create_resource_group(name, timeout=timeout, **kwargs)
1385+
1386+
def update_resource_groups(self, configs: Mapping[str, ResourceGroupConfig], timeout=None, **kwargs):
1387+
"""update resource groups with specific configs
1388+
1389+
:param configs: resource group configs
1390+
:type name: Mapping
1391+
"""
1392+
with self._connection() as handler:
1393+
handler.update_resource_groups(configs=configs, timeout=timeout, **kwargs)
13831394

13841395
def drop_resource_group(self, name, timeout=None, **kwargs):
13851396
"""drop resource group with specific name

0 commit comments

Comments
 (0)