/usr/share/gocode/src/github.com/Shopify/sarama/produce_set_test.go is in golang-github-shopify-sarama-dev 1.9.0-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | package sarama
import (
"testing"
"time"
)
func makeProduceSet() (*asyncProducer, *produceSet) {
parent := &asyncProducer{
conf: NewConfig(),
}
return parent, newProduceSet(parent)
}
func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) {
if err := ps.add(msg); err != nil {
t.Error(err)
}
}
func TestProduceSetInitial(t *testing.T) {
_, ps := makeProduceSet()
if !ps.empty() {
t.Error("New produceSet should be empty")
}
if ps.readyToFlush() {
t.Error("Empty produceSet must never be ready to flush")
}
}
func TestProduceSetAddingMessages(t *testing.T) {
parent, ps := makeProduceSet()
parent.conf.Producer.Flush.MaxMessages = 1000
msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
safeAddMessage(t, ps, msg)
if ps.empty() {
t.Error("set shouldn't be empty when a message is added")
}
if !ps.readyToFlush() {
t.Error("by default set should be ready to flush when any message is in place")
}
for i := 0; i < 999; i++ {
if ps.wouldOverflow(msg) {
t.Error("set shouldn't fill up after only", i+1, "messages")
}
safeAddMessage(t, ps, msg)
}
if !ps.wouldOverflow(msg) {
t.Error("set should be full after 1000 messages")
}
}
func TestProduceSetPartitionTracking(t *testing.T) {
_, ps := makeProduceSet()
m1 := &ProducerMessage{Topic: "t1", Partition: 0}
m2 := &ProducerMessage{Topic: "t1", Partition: 1}
m3 := &ProducerMessage{Topic: "t2", Partition: 0}
safeAddMessage(t, ps, m1)
safeAddMessage(t, ps, m2)
safeAddMessage(t, ps, m3)
seenT1P0 := false
seenT1P1 := false
seenT2P0 := false
ps.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
if len(msgs) != 1 {
t.Error("Wrong message count")
}
if topic == "t1" && partition == 0 {
seenT1P0 = true
} else if topic == "t1" && partition == 1 {
seenT1P1 = true
} else if topic == "t2" && partition == 0 {
seenT2P0 = true
}
})
if !seenT1P0 {
t.Error("Didn't see t1p0")
}
if !seenT1P1 {
t.Error("Didn't see t1p1")
}
if !seenT2P0 {
t.Error("Didn't see t2p0")
}
if len(ps.dropPartition("t1", 1)) != 1 {
t.Error("Got wrong messages back from dropping partition")
}
if ps.bufferCount != 2 {
t.Error("Incorrect buffer count after dropping partition")
}
}
func TestProduceSetRequestBuilding(t *testing.T) {
parent, ps := makeProduceSet()
parent.conf.Producer.RequiredAcks = WaitForAll
parent.conf.Producer.Timeout = 10 * time.Second
msg := &ProducerMessage{
Topic: "t1",
Partition: 0,
Key: StringEncoder(TestMessage),
Value: StringEncoder(TestMessage),
}
for i := 0; i < 10; i++ {
safeAddMessage(t, ps, msg)
}
msg.Partition = 1
for i := 0; i < 10; i++ {
safeAddMessage(t, ps, msg)
}
msg.Topic = "t2"
for i := 0; i < 10; i++ {
safeAddMessage(t, ps, msg)
}
req := ps.buildRequest()
if req.RequiredAcks != WaitForAll {
t.Error("RequiredAcks not set properly")
}
if req.Timeout != 10000 {
t.Error("Timeout not set properly")
}
if len(req.msgSets) != 2 {
t.Error("Wrong number of topics in request")
}
}
|