-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathkafka_spec.rb
76 lines (67 loc) · 3.5 KB
/
kafka_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require 'logstash/outputs/kafka'
require 'json'
describe "outputs/kafka" do
let (:simple_kafka_config) {{'topic_id' => 'test'}}
let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic', 'host' => '172.0.0.1',
'@timestamp' => LogStash::Timestamp.now, 'other_timestamp' => 1487669184685}) }
context 'when initializing' do
it "should register" do
output = LogStash::Plugin.lookup("output", "kafka").new(simple_kafka_config)
expect {output.register}.to_not raise_error
end
it 'should populate kafka config with default values' do
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config)
insist {kafka.bootstrap_servers} == 'localhost:9092'
insist {kafka.topic_id} == 'test'
insist {kafka.key_serializer} == 'org.apache.kafka.common.serialization.StringSerializer'
end
end
context 'when outputting messages' do
it 'should send logstash event to kafka broker' do
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
.with(an_instance_of(org.apache.kafka.clients.producer.ProducerRecord))
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config)
kafka.register
kafka.receive(event)
end
it 'should support Event#sprintf placeholders in topic_id' do
topic_field = 'topic_name'
expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
.with("my_topic", nil, nil, nil, event.to_s)
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"})
kafka.register
kafka.receive(event)
end
it 'should support field referenced message_keys' do
expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
.with("test", nil, nil, "172.0.0.1", event.to_s)
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_key" => "%{host}"}))
kafka.register
kafka.receive(event)
end
it 'should support timestamp from specified message field' do
expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
.with("test", nil, event.get('@timestamp').time.to_i, nil, event.to_s)
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "@timestamp"}))
kafka.register
kafka.receive(event)
end
it 'should support field referenced message_timestamp' do
expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
.with("test", nil, event.get('other_timestamp').to_i, nil, event.to_s)
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "%{other_timestamp}"}))
kafka.register
kafka.receive(event)
end
it 'should raise config error when truststore location is not set and ssl is enabled' do
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"ssl" => "true"}))
expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/)
end
end
end