Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Adding support for EventBridge
Browse files Browse the repository at this point in the history
  • Loading branch information
jmnarloch committed Feb 24, 2020
1 parent 33e1535 commit c296334
Show file tree
Hide file tree
Showing 12 changed files with 801 additions and 14 deletions.
13 changes: 0 additions & 13 deletions .idea/misc.xml

This file was deleted.

2 changes: 1 addition & 1 deletion spring-cloud-aws-dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<name>Spring Cloud AWS Dependencies</name>
<description>Spring Cloud AWS Dependencies</description>
<properties>
<aws-java-sdk.version>1.11.415</aws-java-sdk.version>
<aws-java-sdk.version>1.11.624</aws-java-sdk.version>
<elasticache.version>1.1.1</elasticache.version>
<jmemcached.version>1.0.0</jmemcached.version>
<spring-cloud-context.version>1.3.2.RELEASE</spring-cloud-context.version>
Expand Down
4 changes: 4 additions & 0 deletions spring-cloud-aws-messaging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-events</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.config.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.context.annotation.Import;

/**
* @author Jakub Narloch
* @since 2.3.0
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import({ EventBridgeConfiguration.class })
public @interface EnableEventBridge {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.config.annotation;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents;
import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEventsClient;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.context.annotation.ConditionalOnMissingAmazonClient;
import org.springframework.cloud.aws.core.config.AmazonWebserviceClientFactoryBean;
import org.springframework.cloud.aws.core.region.RegionProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author Jakub Narloch
* @since 2.3.0
*/
@Configuration(proxyBeanMethods = false)
public class EventBridgeConfiguration {

@Autowired(required = false)
private AWSCredentialsProvider awsCredentialsProvider;

@Autowired(required = false)
private RegionProvider regionProvider;

@ConditionalOnMissingAmazonClient(AmazonCloudWatchEvents.class)
@Bean
public AmazonWebserviceClientFactoryBean<AmazonCloudWatchEventsClient> amazonEvents() {
return new AmazonWebserviceClientFactoryBean<>(AmazonCloudWatchEventsClient.class,
this.awsCredentialsProvider, this.regionProvider);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.core;

import java.util.Optional;

import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents;
import com.amazonaws.services.cloudwatchevents.model.PutEventsRequest;
import com.amazonaws.services.cloudwatchevents.model.PutEventsRequestEntry;

import org.springframework.messaging.Message;
import org.springframework.messaging.support.AbstractMessageChannel;

/**
* @author Jakub Narloch
* @since 2.3.0
*/
public class EventBusMessageChannel extends AbstractMessageChannel {

/**
* The 'source' message header.
*/
public static final String EVENT_SOURCE_HEADER = "EVENT_SOURCE_HEADER";

/**
* The 'detail-type' message header.
*/
public static final String EVENT_DETAIL_TYPE_HEADER = "EVENT_DETAIL_TYPE_HEADER";

private final AmazonCloudWatchEvents amazonEvents;

private final String eventBus;

public EventBusMessageChannel(AmazonCloudWatchEvents amazonEvents, String eventBus) {
this.amazonEvents = amazonEvents;
this.eventBus = eventBus;
}

@Override
protected boolean sendInternal(Message<?> message, long timeout) {
PutEventsRequestEntry entry = new PutEventsRequestEntry()
.withEventBusName(eventBus).withSource(findEventSource(message))
.withDetailType(findEventDetailType(message))
.withDetail(message.getPayload().toString());
amazonEvents.putEvents(new PutEventsRequest().withEntries(entry));
return true;
}

private static String findEventSource(Message<?> message) {
return findHeaderValue(message, EVENT_SOURCE_HEADER);
}

private static String findEventDetailType(Message<?> message) {
return findHeaderValue(message, EVENT_DETAIL_TYPE_HEADER);
}

private static String findHeaderValue(Message<?> message, String header) {
return Optional.ofNullable(message.getHeaders().get(header)).map(Object::toString)
.orElse(null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.core;

import java.util.HashMap;
import java.util.Map;

import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents;

import org.springframework.cloud.aws.core.env.ResourceIdResolver;
import org.springframework.cloud.aws.messaging.core.support.AbstractMessageChannelMessagingSendingTemplate;
import org.springframework.cloud.aws.messaging.support.destination.DynamicEventBusDestinationResolver;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.core.DestinationResolver;

/**
* @author Jakub Narloch
* @since 2.3.0
*/
public class EventsMessagingTemplate
extends AbstractMessageChannelMessagingSendingTemplate<EventBusMessageChannel> {

private final AmazonCloudWatchEvents amazonEvents;

public EventsMessagingTemplate(AmazonCloudWatchEvents amazonEvents) {
this(amazonEvents, (ResourceIdResolver) null, null);
}

public EventsMessagingTemplate(AmazonCloudWatchEvents amazonEvents,
ResourceIdResolver resourceIdResolver, MessageConverter messageConverter) {
super(new DynamicEventBusDestinationResolver(amazonEvents, resourceIdResolver));
this.amazonEvents = amazonEvents;
initMessageConverter(messageConverter);
}

public EventsMessagingTemplate(AmazonCloudWatchEvents amazonEvents,
DestinationResolver<String> destinationResolver,
MessageConverter messageConverter) {
super(destinationResolver);
this.amazonEvents = amazonEvents;
initMessageConverter(messageConverter);
}

@Override
protected EventBusMessageChannel resolveMessageChannel(
String physicalResourceIdentifier) {
return new EventBusMessageChannel(this.amazonEvents, physicalResourceIdentifier);
}

/**
* Convenience method that sends an event identified by {@literal source} and
* {@literal detailType} with the given {@literal message} to the
* {@literal destination}.
* @param source The event source
* @param detailType The event detail-type
* @param message The event body to send
*/
public void sendEvent(String source, String detailType, Object message) {
Map<String, Object> headers = new HashMap<>();
headers.put(EventBusMessageChannel.EVENT_SOURCE_HEADER, source);
headers.put(EventBusMessageChannel.EVENT_DETAIL_TYPE_HEADER, detailType);
this.convertAndSend(getRequiredDefaultDestination(), message, headers);
}

/**
* Convenience method that sends an event identified by {@literal source} and
* {@literal detailType} with the given {@literal message} to the specific
* {@literal eventBus}.
* @param eventBus The event bus name
* @param source The event source
* @param detailType The event detail-type
* @param message The event body to send
*/
public void sendEvent(String eventBus, String source, String detailType,
Object message) {
Map<String, Object> headers = new HashMap<>();
headers.put(EventBusMessageChannel.EVENT_SOURCE_HEADER, source);
headers.put(EventBusMessageChannel.EVENT_DETAIL_TYPE_HEADER, detailType);
this.convertAndSend(eventBus, message, headers);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.support.destination;

import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents;
import com.amazonaws.services.cloudwatchevents.model.CreateEventBusRequest;

import org.springframework.cloud.aws.core.env.ResourceIdResolver;
import org.springframework.cloud.aws.core.naming.AmazonResourceName;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.messaging.core.DestinationResolver;

/**
* @author Jakub Narloch
* @since 2.3.0
*/
public class DynamicEventBusDestinationResolver implements DestinationResolver<String> {

private final AmazonCloudWatchEvents amazonEvents;

private final ResourceIdResolver resourceIdResolver;

private boolean autoCreate;

public DynamicEventBusDestinationResolver(AmazonCloudWatchEvents amazonEvents) {
this(amazonEvents, null);
}

public DynamicEventBusDestinationResolver(AmazonCloudWatchEvents amazonEvents,
ResourceIdResolver resourceIdResolver) {
this.amazonEvents = amazonEvents;
this.resourceIdResolver = resourceIdResolver;
}

public void setAutoCreate(boolean autoCreate) {
this.autoCreate = autoCreate;
}

@Override
public String resolveDestination(String name) throws DestinationResolutionException {
if (autoCreate) {
amazonEvents.createEventBus(new CreateEventBusRequest().withName(name))
.getEventBusArn();
return name;
}

String eventBusName = name;
if (resourceIdResolver != null) {
eventBusName = resourceIdResolver.resolveToPhysicalResourceId(name);
}

if (eventBusName != null
&& AmazonResourceName.isValidAmazonResourceName(eventBusName)) {
return AmazonResourceName.fromString(eventBusName).getResourceName();
}

return eventBusName;
}

}
Loading

0 comments on commit c296334

Please sign in to comment.