Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[features] Leader election + KUBERNETES_NAMESPACE environment variable #19

Merged
merged 6 commits into from
Dec 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 98 additions & 1 deletion wp-operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@
from kubernetes import client, config
from kubernetes.dynamic import DynamicClient
from kubernetes.client.exceptions import ApiException
from kubernetes.leaderelection import leaderelection
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock
from kubernetes.leaderelection import electionconfig
import base64
import os
import subprocess
import sys
import yaml
import re
from datetime import datetime, timezone
import threading
import time

import secrets
import uuid

class Config:
secret_name = "nginx-conf-site-tree"
Expand Down Expand Up @@ -652,6 +658,97 @@ def ensure_wp_crd_exists(cls):
return False


class NamespaceFromEnv:
@classmethod
def guess (cls):
if 'KUBERNETES_NAMESPACE' in os.environ:
return os.environ['KUBERNETES_NAMESPACE']
else:
# Poor man's `click`
for i in range(1, len(sys.argv)):
if (i < len(sys.argv)) and (sys.argv[i] in ('--namespace', '-n')):
return sys.argv[i + 1]
else:
matched = re.match('--namespace=(.*)$', sys.argv[i])
if matched:
return matched[1]

raise ValueError('This is a namespaced-*only* operator. Please set KUBERNETES_NAMESPACE or pass --namespace.')

guessed = None
@classmethod
def get (cls):
if cls.guessed is None:
cls.guessed = cls.guess()

return cls.guessed

@classmethod
def setup (cls):
namespace = cls.get()
logging.info(f'Running in namespace {namespace}')
os.environ['KUBERNETES_NAMESPACE'] = namespace
try:
dashdash_position = sys.argv.index('--')
except ValueError:
dashdash_position = len(sys.argv)
sys.argv[dashdash_position:dashdash_position] = [f'--namespace={namespace}']


class NamespaceLeaderElection:
def __init__(self):
self.lock_namespace = NamespaceFromEnv.get()
self.lock_name = f"wpn-operator-lock"
self.candidate_id = uuid.uuid4()
self.config = electionconfig.Config(
ConfigMapLock(
self.lock_name,
self.lock_namespace,
self.candidate_id
),
lease_duration = 17,
renew_deadline = 15,
retry_period = 5,
onstarted_leading = self.start_kopf_in_thread,
onstopped_leading = self.exit_immediately
)

def start_kopf_in_thread(self):
print(f"Instance {self.candidate_id} is the leader for namespace {self.lock_namespace}.")
def do_run_kopf ():
sys.exit(kopf.cli.main())

threading.Thread(target=do_run_kopf).run()

def exit_immediately(self):
print(f"Instance {self.candidate_id} stopped being the leader for namespace {self.lock_namespace}.")
sys.exit(0)

@classmethod
def go (cls):
config.load_kube_config()
leader_election = cls()

class QuietLeaderElection(leaderelection.LeaderElection):
def update_lock(self, leader_election_record):
"""(Copied and) overridden to silence the “has successfully acquired lease” message every 5 seconds."""
# Update object with latest election record
update_status = self.election_config.lock.update(self.election_config.lock.name,
self.election_config.lock.namespace,
leader_election_record)

if update_status is False:
logging.info("{} failed to acquire lease".format(leader_election_record.holder_identity))
return False

self.observed_record = leader_election_record
self.observed_time_milliseconds = int(time.time() * 1000)
return True

QuietLeaderElection(leader_election.config).run()


if __name__ == '__main__':
Config.load_from_command_line()
sys.exit(kopf.cli.main())
NamespaceFromEnv.setup()
NamespaceLeaderElection.go()