Skip to content

Commit 4224496

Browse files
authored
Merge pull request #141 from Netflix-Skunkworks/vrayini/vertical-scale-param
Restrict Instance Type Param + Change Tier Target CPU Bounds
2 parents e2de141 + 6df3cb3 commit 4224496

File tree

2 files changed

+152
-16
lines changed

2 files changed

+152
-16
lines changed

service_capacity_modeling/models/org/netflix/kafka.py

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from service_capacity_modeling.interface import certain_int
1919
from service_capacity_modeling.interface import Clusters
2020
from service_capacity_modeling.interface import Consistency
21+
from service_capacity_modeling.interface import CurrentZoneClusterCapacity
2122
from service_capacity_modeling.interface import DataShape
2223
from service_capacity_modeling.interface import Drive
2324
from service_capacity_modeling.interface import GIB_IN_BYTES
@@ -47,13 +48,30 @@ def target_cpu_utilization(tier: int) -> float:
4748
"""
4849
Returns the target average cluster CPU utilization for a given tier
4950
"""
50-
if tier == 0:
51-
return 0.25
52-
if tier == 1:
53-
return 0.30
54-
if tier == 2:
55-
return 0.35
56-
return 0.40
51+
if tier in (0, 1):
52+
return 0.40
53+
return 0.50
54+
55+
56+
def _get_current_zonal_cluster(
57+
desires: CapacityDesires,
58+
) -> Optional[CurrentZoneClusterCapacity]:
59+
return (
60+
None
61+
if desires.current_clusters is None
62+
else (
63+
desires.current_clusters.zonal[0]
64+
if len(desires.current_clusters.zonal)
65+
else None
66+
)
67+
)
68+
69+
70+
def _is_same_instance_family(cluster, target_family):
71+
"""Check if cluster has a different instance family than the target."""
72+
if not cluster or not cluster.cluster_instance:
73+
return False
74+
return cluster.cluster_instance.family == target_family
5775

5876

5977
def _estimate_kafka_requirement( # pylint: disable=too-many-positional-arguments
@@ -90,15 +108,7 @@ def _estimate_kafka_requirement( # pylint: disable=too-many-positional-argument
90108
write_mib_per_second
91109
)
92110
# use the current cluster capacity if available
93-
current_zonal_cluster = (
94-
None
95-
if desires.current_clusters is None
96-
else (
97-
desires.current_clusters.zonal[0]
98-
if len(desires.current_clusters.zonal)
99-
else None
100-
)
101-
)
111+
current_zonal_cluster = _get_current_zonal_cluster(desires)
102112

103113
if (
104114
current_zonal_cluster
@@ -230,6 +240,7 @@ def _estimate_kafka_cluster_zonal( # pylint: disable=too-many-positional-argume
230240
max_local_disk_gib: int = 1024 * 5,
231241
min_instance_cpu: int = 2,
232242
min_instance_memory_gib: int = 12,
243+
require_same_instance_family: bool = True,
233244
) -> Optional[CapacityPlan]:
234245

235246
# Kafka doesn't like to deploy on single CPU instances or with < 12 GiB of ram
@@ -248,6 +259,15 @@ def _estimate_kafka_cluster_zonal( # pylint: disable=too-many-positional-argume
248259
if instance.drive is None and drive.name != "gp3":
249260
return None
250261

262+
# If there is a current cluster, check if we are restricted to same instance family
263+
current_zonal_cluster = _get_current_zonal_cluster(desires)
264+
if (
265+
current_zonal_cluster
266+
and require_same_instance_family
267+
and not _is_same_instance_family(current_zonal_cluster, instance.family)
268+
):
269+
return None
270+
251271
requirement, regrets = _estimate_kafka_requirement(
252272
instance=instance,
253273
desires=desires,
@@ -461,6 +481,10 @@ def capacity_plan(
461481
required_zone_size: Optional[int] = extra_model_arguments.get(
462482
"required_zone_size", None
463483
)
484+
# By default, for existing clusters, restrict to only using same instance family
485+
require_same_instance_family: bool = extra_model_arguments.get(
486+
"require_same_instance_family", True
487+
)
464488

465489
return _estimate_kafka_cluster_zonal(
466490
instance=instance,
@@ -476,6 +500,7 @@ def capacity_plan(
476500
min_instance_cpu=min_instance_cpu,
477501
min_instance_memory_gib=min_instance_memory_gib,
478502
hot_retention_seconds=hot_retention_seconds,
503+
require_same_instance_family=require_same_instance_family,
479504
)
480505

481506
@staticmethod

tests/netflix/test_kafka.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ def test_plan_certain_data_shape():
425425
"retention": "PT8H",
426426
"require_attached_disks": True,
427427
"required_zone_size": cluster_capacity.cluster_instance_count.mid,
428+
"require_same_instance_family": False,
428429
},
429430
)
430431

@@ -435,3 +436,113 @@ def test_plan_certain_data_shape():
435436
assert lr_clusters[0].count == cluster_capacity.cluster_instance_count.high
436437
for lr in cap_plan:
437438
print(lr.candidate_clusters.zonal[0])
439+
families = set(
440+
map(
441+
lambda curr_plan: curr_plan.candidate_clusters.zonal[0].instance.family,
442+
cap_plan,
443+
)
444+
)
445+
# check that we did not restrict the instance family to only r7a
446+
assert families != {"r7a"}
447+
448+
449+
def test_plan_certain_data_shape_same_instance_type():
450+
"""
451+
Use current clusters cpu utilization to determine instance types directly as
452+
supposed to extrapolating it from the Data Shape
453+
"""
454+
cluster_capacity = CurrentZoneClusterCapacity(
455+
cluster_instance_name="r7a.4xlarge",
456+
cluster_drive=Drive(
457+
name="gp3",
458+
drive_type=DriveType.attached_ssd,
459+
size_gib=5000,
460+
block_size_kib=16,
461+
),
462+
cluster_instance_count=Interval(low=15, mid=15, high=15, confidence=1),
463+
cpu_utilization=Interval(
464+
low=5.441147804260254,
465+
mid=13.548842955300195,
466+
high=25.11203956604004,
467+
confidence=1,
468+
),
469+
memory_utilization_gib=Interval(low=0, mid=0, high=0, confidence=1),
470+
network_utilization_mbps=Interval(
471+
low=4580.919447446355,
472+
mid=19451.59814477331,
473+
high=42963.441154527085,
474+
confidence=1,
475+
),
476+
disk_utilization_gib=Interval(
477+
low=1341.579345703125,
478+
mid=1940.8741284013684,
479+
high=2437.607421875,
480+
confidence=1,
481+
),
482+
)
483+
484+
desires = CapacityDesires(
485+
service_tier=1,
486+
current_clusters=CurrentClusters(zonal=[cluster_capacity]),
487+
query_pattern=QueryPattern(
488+
access_pattern=AccessPattern(AccessPattern.latency),
489+
# 2 consumers
490+
estimated_read_per_second=Interval(low=2, mid=2, high=4, confidence=1),
491+
# 1 producer
492+
estimated_write_per_second=Interval(low=1, mid=1, high=1, confidence=0.98),
493+
estimated_mean_read_latency_ms=Interval(low=1, mid=1, high=1, confidence=1),
494+
estimated_mean_write_latency_ms=Interval(
495+
low=1, mid=1, high=1, confidence=1
496+
),
497+
estimated_mean_read_size_bytes=Interval(
498+
low=1024, mid=1024, high=1024, confidence=1
499+
),
500+
estimated_mean_write_size_bytes=Interval(
501+
low=125000000, mid=579000000, high=1351000000, confidence=0.98
502+
),
503+
estimated_read_parallelism=Interval(low=1, mid=1, high=1, confidence=1),
504+
estimated_write_parallelism=Interval(low=1, mid=1, high=1, confidence=1),
505+
read_latency_slo_ms=FixedInterval(low=0.4, mid=4, high=10, confidence=0.98),
506+
write_latency_slo_ms=FixedInterval(
507+
low=0.4, mid=4, high=10, confidence=0.98
508+
),
509+
),
510+
data_shape=DataShape(
511+
estimated_state_size_gib=Interval(
512+
low=44000, mid=86000, high=91000, confidence=1
513+
),
514+
),
515+
)
516+
517+
cap_plan = planner.plan_certain(
518+
model_name="org.netflix.kafka",
519+
region="us-east-1",
520+
num_results=3,
521+
num_regions=4,
522+
desires=desires,
523+
extra_model_arguments={
524+
"cluster_type": ClusterType.ha,
525+
"retention": "PT8H",
526+
"require_attached_disks": True,
527+
"required_zone_size": cluster_capacity.cluster_instance_count.mid,
528+
"require_same_instance_family": True,
529+
},
530+
)
531+
532+
assert len(cap_plan) >= 1
533+
lr_clusters = cap_plan[0].candidate_clusters.zonal
534+
assert len(lr_clusters) >= 1
535+
print(lr_clusters[0].instance.name)
536+
assert lr_clusters[0].count == cluster_capacity.cluster_instance_count.high
537+
538+
families = set(
539+
map(
540+
lambda curr_plan: curr_plan.candidate_clusters.zonal[0].instance.family,
541+
cap_plan,
542+
)
543+
)
544+
# check that we restricted the instance family to only r7a
545+
assert families == {"r7a"}
546+
547+
for lr in cap_plan:
548+
print(lr.candidate_clusters.zonal[0])

0 commit comments

Comments
 (0)