Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][misc] PIP-264: Add OpenTelemetry authentication and token metrics #23016

Merged
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
973375d
Add OpenTelemetry dependency to pulsar-broker-common
dragosvictor May 3, 2024
80e2b56
Add authentication counter metric
dragosvictor May 3, 2024
169acfc
Define AuthenticationProviderBase
dragosvictor May 3, 2024
7d9da0d
Define AuthenticationProvider.InitParameters
dragosvictor May 3, 2024
c1e2871
Implement AuthenticationProvider.initialize(InitParameters) interface
dragosvictor May 3, 2024
acbee59
Update static Prometheus counters in AuthenticationMetrics instance c…
dragosvictor May 3, 2024
83a64fc
Draft AuthenticationService constructor fixes
dragosvictor May 3, 2024
964ee7e
Cosmetic fixes
dragosvictor May 3, 2024
1892b58
Throw exception in AuthenticationProviderBase.initialize(ServiceConfi…
dragosvictor May 3, 2024
ef653e0
Merge remote-tracking branch 'origin/master' into dmisca-pip-264-auth…
dragosvictor Jun 25, 2024
f00889d
Checkstyle fixes
dragosvictor Jun 25, 2024
d205e24
Update
dragosvictor Jun 25, 2024
c5bfc51
Add authentication metrics test
dragosvictor Jun 25, 2024
0f462de
Merge remote-tracking branch 'origin/master' into dmisca-pip-264-auth…
dragosvictor Jul 2, 2024
c5f6938
Merge branch 'master' into dmisca-pip-264-authentication-metrics
dragosvictor Jul 5, 2024
3e3d3ee
Merge remote-tracking branch 'origin/master' into dmisca-pip-264-auth…
dragosvictor Jul 8, 2024
2220ebe
Refactor
dragosvictor Jul 8, 2024
675f18b
Update test
dragosvictor Jul 8, 2024
42c94b6
Update AuthenticationProviderList
dragosvictor Jul 9, 2024
74918ca
Fix tests
dragosvictor Jul 9, 2024
2b0bdc2
Update overloads
dragosvictor Jul 9, 2024
b1ce5b0
Collect auth provider parameters into class AuthenticationProviderCon…
dragosvictor Jul 9, 2024
c947325
Fix test
dragosvictor Jul 9, 2024
2c47494
Refactor AuthenticationMetrics
dragosvictor Jul 9, 2024
4c2fc2c
Add AuthenticationMetricsToken class
dragosvictor Jul 9, 2024
7fdc594
Remove AuthenticationProviderBase class
dragosvictor Jul 9, 2024
21a4c37
Fix tests in AuthenticationProviderOpenIDTest
dragosvictor Jul 10, 2024
913e471
Merge remote-tracking branch 'origin/master' into dmisca-pip-264-auth…
dragosvictor Aug 20, 2024
dee6397
Improve granularity of pulsar_expiring_token_minutes Prometheus histo…
dragosvictor Aug 20, 2024
fa9eca3
Fix test
dragosvictor Aug 20, 2024
5ab1c3a
Merge remote-tracking branch 'origin/master' into dmisca-pip-264-auth…
dragosvictor Aug 26, 2024
1be6157
Use same bucket boundaries for Prometheus and OTel token duration met…
dragosvictor Aug 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class AuthenticationProviderAthenz implements AuthenticationProvider {
private List<String> domainNameList = null;
private int allowedOffset = 30;

private AuthenticationMetrics authenticationMetrics;

public enum ErrorCode {
UNKNOWN,
NO_CLIENT,
Expand All @@ -54,6 +56,14 @@ public enum ErrorCode {

@Override
public void initialize(ServiceConfiguration config) throws IOException {
initialize(Context.builder().config(config).build());
}

@Override
public void initialize(Context context) throws IOException {
authenticationMetrics = new AuthenticationMetrics(context.getOpenTelemetry(),
getClass().getSimpleName(), getAuthMethodName());
var config = context.getConfig();
String domainNames;
if (config.getProperty(DOMAIN_NAME_LIST) != null) {
domainNames = (String) config.getProperty(DOMAIN_NAME_LIST);
Expand Down Expand Up @@ -86,6 +96,11 @@ public String getAuthMethodName() {
return "athenz";
}

@Override
public void incrementFailureMetric(Enum<?> errorCode) {
authenticationMetrics.recordFailure(errorCode);
}

@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
SocketAddress clientAddress;
Expand Down Expand Up @@ -141,7 +156,7 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat

if (token.validate(ztsPublicKey, allowedOffset, false, null)) {
log.debug("Athenz Role Token : {}, Authenticated for Client: {}", roleToken, clientAddress);
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
authenticationMetrics.recordSuccess();
return token.getPrincipal();
} else {
errorCode = ErrorCode.INVALID_TOKEN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,16 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

import com.yahoo.athenz.auth.token.RoleToken;
import com.yahoo.athenz.zpe.ZpeConsts;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import javax.naming.AuthenticationException;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -55,7 +51,7 @@ public void setup() throws Exception {

// Initialize authentication provider
provider = new AuthenticationProviderAthenz();
provider.initialize(config);
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());

// Specify Athenz configuration file for AuthZpeClient which is used in AuthenticationProviderAthenz
System.setProperty(ZpeConsts.ZPE_PROP_ATHENZ_CONF, "./src/test/resources/athenz.conf.test");
Expand All @@ -69,7 +65,7 @@ public void testInitilizeFromSystemPropeties() {
emptyConf.setProperties(emptyProp);
AuthenticationProviderAthenz sysPropProvider1 = new AuthenticationProviderAthenz();
try {
sysPropProvider1.initialize(emptyConf);
sysPropProvider1.initialize(AuthenticationProvider.Context.builder().config(emptyConf).build());
assertEquals(sysPropProvider1.getAllowedOffset(), 30); // default allowed offset is 30 sec
} catch (Exception e) {
fail("Fail to Read pulsar.athenz.domain.names from System Properties");
Expand All @@ -78,7 +74,7 @@ public void testInitilizeFromSystemPropeties() {
System.setProperty("pulsar.athenz.role.token_allowed_offset", "0");
AuthenticationProviderAthenz sysPropProvider2 = new AuthenticationProviderAthenz();
try {
sysPropProvider2.initialize(config);
sysPropProvider2.initialize(AuthenticationProvider.Context.builder().config(config).build());
assertEquals(sysPropProvider2.getAllowedOffset(), 0);
} catch (Exception e) {
fail("Failed to get allowed offset from system property");
Expand All @@ -87,15 +83,15 @@ public void testInitilizeFromSystemPropeties() {
System.setProperty("pulsar.athenz.role.token_allowed_offset", "invalid");
AuthenticationProviderAthenz sysPropProvider3 = new AuthenticationProviderAthenz();
try {
sysPropProvider3.initialize(config);
sysPropProvider3.initialize(AuthenticationProvider.Context.builder().config(config).build());
fail("Invalid allowed offset should not be specified");
} catch (IOException e) {
}

System.setProperty("pulsar.athenz.role.token_allowed_offset", "-1");
AuthenticationProviderAthenz sysPropProvider4 = new AuthenticationProviderAthenz();
try {
sysPropProvider4.initialize(config);
sysPropProvider4.initialize(AuthenticationProvider.Context.builder().config(config).build());
fail("Negative allowed offset should not be specified");
} catch (IOException e) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@
public class AuthenticationProviderOpenID implements AuthenticationProvider {
private static final Logger log = LoggerFactory.getLogger(AuthenticationProviderOpenID.class);

private static final String SIMPLE_NAME = AuthenticationProviderOpenID.class.getSimpleName();

// Must match the value used by the OAuth2 Client Plugin.
private static final String AUTH_METHOD_NAME = "token";

Expand Down Expand Up @@ -148,8 +146,18 @@ public class AuthenticationProviderOpenID implements AuthenticationProvider {
private String[] allowedAudiences;
private ApiClient k8sApiClient;

private AuthenticationMetrics authenticationMetrics;

@Override
public void initialize(ServiceConfiguration config) throws IOException {
initialize(Context.builder().config(config).build());
}

@Override
public void initialize(Context context) throws IOException {
authenticationMetrics = new AuthenticationMetrics(context.getOpenTelemetry(),
getClass().getSimpleName(), getAuthMethodName());
var config = context.getConfig();
this.allowedAudiences = validateAllowedAudiences(getConfigValueAsSet(config, ALLOWED_AUDIENCES));
this.roleClaim = getConfigValueAsString(config, ROLE_CLAIM, ROLE_CLAIM_DEFAULT);
this.isRoleClaimNotSubject = !ROLE_CLAIM_DEFAULT.equals(roleClaim);
Expand Down Expand Up @@ -181,15 +189,20 @@ public void initialize(ServiceConfiguration config) throws IOException {
.build();
httpClient = new DefaultAsyncHttpClient(clientConfig);
k8sApiClient = fallbackDiscoveryMode != FallbackDiscoveryMode.DISABLED ? Config.defaultClient() : null;
this.openIDProviderMetadataCache = new OpenIDProviderMetadataCache(config, httpClient, k8sApiClient);
this.jwksCache = new JwksCache(config, httpClient, k8sApiClient);
this.openIDProviderMetadataCache = new OpenIDProviderMetadataCache(this, config, httpClient, k8sApiClient);
this.jwksCache = new JwksCache(this, config, httpClient, k8sApiClient);
}

@Override
public String getAuthMethodName() {
return AUTH_METHOD_NAME;
}

@Override
public void incrementFailureMetric(Enum<?> errorCode) {
authenticationMetrics.recordFailure(errorCode);
}

/**
* Authenticate the parameterized {@link AuthenticationDataSource} by verifying the issuer is an allowed issuer,
* then retrieving the JWKS URI from the issuer, then retrieving the Public key from the JWKS URI, and finally
Expand Down Expand Up @@ -219,7 +232,7 @@ CompletableFuture<DecodedJWT> authenticateTokenAsync(AuthenticationDataSource au
return authenticateToken(token)
.whenComplete((jwt, e) -> {
if (jwt != null) {
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
authenticationMetrics.recordSuccess();
}
// Failure metrics are incremented within methods above
});
Expand Down Expand Up @@ -463,10 +476,6 @@ DecodedJWT verifyJWT(PublicKey publicKey,
}
}

static void incrementFailureMetric(AuthenticationExceptionCode code) {
AuthenticationMetrics.authenticateFailure(SIMPLE_NAME, AUTH_METHOD_NAME, code);
}

/**
* Validate the configured allow list of allowedIssuers. The allowedIssuers set must be nonempty in order for
* the plugin to authenticate any token. Thus, it fails initialization if the configuration is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.authentication.oidc;

import static org.apache.pulsar.broker.authentication.oidc.AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY;
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS;
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS_DEFAULT;
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_REFRESH_AFTER_WRITE_SECONDS;
Expand All @@ -26,7 +27,6 @@
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE_DEFAULT;
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.KEY_ID_CACHE_MISS_REFRESH_SECONDS;
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.KEY_ID_CACHE_MISS_REFRESH_SECONDS_DEFAULT;
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.incrementFailureMetric;
import static org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
import com.auth0.jwk.Jwk;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -49,6 +49,7 @@
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.asynchttpclient.AsyncHttpClient;

public class JwksCache {
Expand All @@ -60,8 +61,11 @@ public class JwksCache {
private final ObjectReader reader = new ObjectMapper().readerFor(HashMap.class);
private final AsyncHttpClient httpClient;
private final OpenidApi openidApi;
private final AuthenticationProvider authenticationProvider;

JwksCache(ServiceConfiguration config, AsyncHttpClient httpClient, ApiClient apiClient) throws IOException {
JwksCache(AuthenticationProvider authenticationProvider, ServiceConfiguration config,
AsyncHttpClient httpClient, ApiClient apiClient) throws IOException {
this.authenticationProvider = authenticationProvider;
// Store the clients
this.httpClient = httpClient;
this.openidApi = apiClient != null ? new OpenidApi(apiClient) : null;
Expand Down Expand Up @@ -91,7 +95,7 @@ public class JwksCache {

CompletableFuture<Jwk> getJwk(String jwksUri, String keyId) {
if (jwksUri == null) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
return CompletableFuture.failedFuture(new IllegalArgumentException("jwksUri must not be null."));
}
return getJwkAndMaybeReload(Optional.of(jwksUri), keyId, false);
Expand Down Expand Up @@ -139,10 +143,10 @@ private CompletableFuture<List<Jwk>> getJwksFromJwksUri(String jwksUri) {
reader.readValue(result.getResponseBodyAsBytes());
future.complete(convertToJwks(jwksUri, jwks));
} catch (AuthenticationException e) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(e);
} catch (Exception e) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(new AuthenticationException(
"Error retrieving public key at " + jwksUri + ": " + e.getMessage()));
}
Expand All @@ -152,7 +156,7 @@ private CompletableFuture<List<Jwk>> getJwksFromJwksUri(String jwksUri) {

CompletableFuture<Jwk> getJwkFromKubernetesApiServer(String keyId) {
if (openidApi == null) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
return CompletableFuture.failedFuture(new AuthenticationException(
"Failed to retrieve public key from Kubernetes API server: Kubernetes fallback is not enabled."));
}
Expand All @@ -165,7 +169,7 @@ private CompletableFuture<List<Jwk>> getJwksFromKubernetesApiServer() {
openidApi.getServiceAccountIssuerOpenIDKeysetAsync(new ApiCallback<String>() {
@Override
public void onFailure(ApiException e, int statusCode, Map<String, List<String>> responseHeaders) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
// We want the message and responseBody here: https://github.com/kubernetes-client/java/issues/2066.
future.completeExceptionally(
new AuthenticationException("Failed to retrieve public key from Kubernetes API server. "
Expand All @@ -178,10 +182,10 @@ public void onSuccess(String result, int statusCode, Map<String, List<String>> r
HashMap<String, Object> jwks = reader.readValue(result);
future.complete(convertToJwks("Kubernetes API server", jwks));
} catch (AuthenticationException e) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(e);
} catch (Exception e) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(new AuthenticationException(
"Error retrieving public key at Kubernetes API server: " + e.getMessage()));
}
Expand All @@ -198,7 +202,7 @@ public void onDownloadProgress(long bytesRead, long contentLength, boolean done)
}
});
} catch (ApiException e) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(
new AuthenticationException("Failed to retrieve public key from Kubernetes API server: "
+ e.getMessage()));
Expand All @@ -212,7 +216,7 @@ private Jwk getJwkForKID(Optional<String> maybeJwksUri, List<Jwk> jwks, String k
return jwk;
}
}
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
throw new IllegalArgumentException("No JWK found for Key ID " + keyId);
}

Expand Down
Loading
Loading