Skip to content

Commit 27f9e53

Browse files
authored
Merge pull request #139 from Netflix-Skunkworks/vrayini/kafka-fix-replication
Fix Kafka Capacity Model Replication Factor
2 parents 342e662 + ba8899c commit 27f9e53

File tree

2 files changed

+112
-7
lines changed

2 files changed

+112
-7
lines changed

service_capacity_modeling/models/org/netflix/kafka.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def _estimate_kafka_requirement( # pylint: disable=too-many-positional-argument
159159
needed_network_mbps = max(bw_in, bw_out) * 1.40
160160

161161
needed_disk = math.ceil(
162-
desires.data_shape.estimated_state_size_gib.mid * copies_per_region,
162+
desires.data_shape.estimated_state_size_gib.mid,
163163
)
164164

165165
# Keep the last N seconds hot in cache
@@ -416,6 +416,10 @@ class NflxKafkaArguments(BaseModel):
416416

417417

418418
class NflxKafkaCapacityModel(CapacityModel):
419+
420+
HA_DEFAULT_REPLICATION_FACTOR = 2
421+
SC_DEFAULT_REPLICATION_FACTOR = 3
422+
419423
@staticmethod
420424
def capacity_plan(
421425
instance: Instance,
@@ -427,11 +431,11 @@ def capacity_plan(
427431
cluster_type: ClusterType = ClusterType(
428432
extra_model_arguments.get("cluster_type", "high-availability")
429433
)
430-
default_replication = 2
434+
replication_factor = NflxKafkaCapacityModel.HA_DEFAULT_REPLICATION_FACTOR
431435
if cluster_type == ClusterType.strong:
432-
default_replication = 3
436+
replication_factor = NflxKafkaCapacityModel.SC_DEFAULT_REPLICATION_FACTOR
433437
copies_per_region: int = extra_model_arguments.get(
434-
"copies_per_region", default_replication
438+
"copies_per_region", replication_factor
435439
)
436440
if cluster_type == ClusterType.strong and copies_per_region < 3:
437441
raise ValueError("Strong consistency and RF<3 doesn't work")
@@ -503,8 +507,13 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]):
503507
retention = extra_model_arguments.get("retention", "PT8H")
504508
retention_secs = iso_to_seconds(retention)
505509

506-
# write throughput * retention = usage
507-
state_gib = (write_bytes.mid * retention_secs) / GIB_IN_BYTES
510+
# write throughput * retention * replication factor = usage
511+
replication_factor = NflxKafkaCapacityModel.HA_DEFAULT_REPLICATION_FACTOR
512+
if extra_model_arguments.get("cluster_type", None) == ClusterType.strong:
513+
replication_factor = NflxKafkaCapacityModel.SC_DEFAULT_REPLICATION_FACTOR
514+
state_gib = (
515+
write_bytes.mid * retention_secs * replication_factor
516+
) / GIB_IN_BYTES
508517

509518
return CapacityDesires(
510519
query_pattern=QueryPattern(

tests/netflix/test_kafka.py

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
from service_capacity_modeling.capacity_planner import planner
2+
from service_capacity_modeling.interface import AccessPattern
23
from service_capacity_modeling.interface import CapacityDesires
34
from service_capacity_modeling.interface import certain_float
45
from service_capacity_modeling.interface import CurrentClusters
56
from service_capacity_modeling.interface import CurrentZoneClusterCapacity
7+
from service_capacity_modeling.interface import DataShape
8+
from service_capacity_modeling.interface import Drive
9+
from service_capacity_modeling.interface import DriveType
10+
from service_capacity_modeling.interface import FixedInterval
611
from service_capacity_modeling.interface import Interval
712
from service_capacity_modeling.interface import QueryPattern
813
from service_capacity_modeling.models.common import normalize_cores
@@ -65,7 +70,7 @@ def test_kafka_large_scale():
6570
region="us-east-1",
6671
desires=desires,
6772
extra_model_arguments={
68-
"cluster_type": "strong",
73+
"cluster_type": ClusterType.strong,
6974
"retention": "PT4H",
7075
},
7176
)
@@ -339,3 +344,94 @@ def test_plan_certain():
339344
assert len(lr_clusters) >= 1
340345
print(lr_clusters[0].instance.name)
341346
assert lr_clusters[0].count == cluster_capacity.cluster_instance_count.high
347+
348+
349+
def test_plan_certain_data_shape():
350+
"""
351+
Use current clusters cpu utilization to determine instance types directly as
352+
supposed to extrapolating it from the Data Shape
353+
"""
354+
cluster_capacity = CurrentZoneClusterCapacity(
355+
cluster_instance_name="r7a.4xlarge",
356+
cluster_drive=Drive(
357+
name="gp3",
358+
drive_type=DriveType.attached_ssd,
359+
size_gib=5000,
360+
block_size_kib=16,
361+
),
362+
cluster_instance_count=Interval(low=15, mid=15, high=15, confidence=1),
363+
cpu_utilization=Interval(
364+
low=5.441147804260254,
365+
mid=13.548842955300195,
366+
high=25.11203956604004,
367+
confidence=1,
368+
),
369+
memory_utilization_gib=Interval(low=0, mid=0, high=0, confidence=1),
370+
network_utilization_mbps=Interval(
371+
low=4580.919447446355,
372+
mid=19451.59814477331,
373+
high=42963.441154527085,
374+
confidence=1,
375+
),
376+
disk_utilization_gib=Interval(
377+
low=1341.579345703125,
378+
mid=1940.8741284013684,
379+
high=2437.607421875,
380+
confidence=1,
381+
),
382+
)
383+
384+
desires = CapacityDesires(
385+
service_tier=1,
386+
current_clusters=CurrentClusters(zonal=[cluster_capacity]),
387+
query_pattern=QueryPattern(
388+
access_pattern=AccessPattern(AccessPattern.latency),
389+
# 2 consumers
390+
estimated_read_per_second=Interval(low=2, mid=2, high=4, confidence=1),
391+
# 1 producer
392+
estimated_write_per_second=Interval(low=1, mid=1, high=1, confidence=0.98),
393+
estimated_mean_read_latency_ms=Interval(low=1, mid=1, high=1, confidence=1),
394+
estimated_mean_write_latency_ms=Interval(
395+
low=1, mid=1, high=1, confidence=1
396+
),
397+
estimated_mean_read_size_bytes=Interval(
398+
low=1024, mid=1024, high=1024, confidence=1
399+
),
400+
estimated_mean_write_size_bytes=Interval(
401+
low=125000000, mid=579000000, high=1351000000, confidence=0.98
402+
),
403+
estimated_read_parallelism=Interval(low=1, mid=1, high=1, confidence=1),
404+
estimated_write_parallelism=Interval(low=1, mid=1, high=1, confidence=1),
405+
read_latency_slo_ms=FixedInterval(low=0.4, mid=4, high=10, confidence=0.98),
406+
write_latency_slo_ms=FixedInterval(
407+
low=0.4, mid=4, high=10, confidence=0.98
408+
),
409+
),
410+
data_shape=DataShape(
411+
estimated_state_size_gib=Interval(
412+
low=44000, mid=86000, high=91000, confidence=1
413+
),
414+
),
415+
)
416+
417+
cap_plan = planner.plan_certain(
418+
model_name="org.netflix.kafka",
419+
region="us-east-1",
420+
num_results=3,
421+
num_regions=4,
422+
desires=desires,
423+
extra_model_arguments={
424+
"cluster_type": ClusterType.ha,
425+
"retention": "PT8H",
426+
"require_attached_disks": True,
427+
"required_zone_size": cluster_capacity.cluster_instance_count.mid,
428+
},
429+
)
430+
431+
assert len(cap_plan) >= 1
432+
lr_clusters = cap_plan[0].candidate_clusters.zonal
433+
assert len(lr_clusters) >= 1
434+
print(lr_clusters[0].instance.name)
435+
assert lr_clusters[0].count == cluster_capacity.cluster_instance_count.high
436+
for lr in cap_plan:
437+
print(lr.candidate_clusters.zonal[0])

0 commit comments

Comments
 (0)