Skip to content

Commit a558bb8

Browse files
Fixes some flaky tests in the build as well as the case when tests start before kafka is ready. (#1349)
1 parent a8e5eab commit a558bb8

File tree

5 files changed

+39
-7
lines changed

5 files changed

+39
-7
lines changed

.circleci/config.yml

+3
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ jobs:
5454
key: kafka-go-mod-{{ checksum "go.sum" }}-1
5555
paths:
5656
- /go/pkg/mod
57+
- run:
58+
name: Wait for kafka
59+
command: ./scripts/wait-for-kafka.sh
5760
- run:
5861
name: Test kafka-go
5962
command: go test -race -cover ./...

addoffsetstotxn_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ func TestClientAddOffsetsToTxn(t *testing.T) {
2222
defer shutdown()
2323

2424
err := clientCreateTopic(client, topic, 3)
25+
defer deleteTopic(t, topic)
2526
if err != nil {
2627
t.Fatal(err)
2728
}
2829

2930
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
31+
waitForTopic(ctx, t, topic)
3032
defer cancel()
3133
respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
3234
Addr: client.Addr,

reader_test.go

+12-7
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ func createTopic(t *testing.T, topic string, partitions int) {
309309
ReplicationFactor: 1,
310310
},
311311
},
312-
Timeout: milliseconds(time.Second),
312+
Timeout: milliseconds(5 * time.Second),
313313
})
314314
if err != nil {
315315
if !errors.Is(err, TopicAlreadyExists) {
@@ -364,8 +364,8 @@ func waitForTopic(ctx context.Context, t *testing.T, topic string) {
364364
}
365365
}
366366

367-
t.Logf("retrying after 1s")
368-
time.Sleep(time.Second)
367+
t.Logf("retrying after 100ms")
368+
time.Sleep(100 * time.Millisecond)
369369
continue
370370
}
371371
}
@@ -1559,17 +1559,22 @@ func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) {
15591559
}
15601560
}
15611561

1562-
func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) {
1563-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1562+
func TestConsumerGroupWithGroupTopicsMultiple(t *testing.T) {
1563+
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
15641564
defer cancel()
15651565

15661566
client, shutdown := newLocalClient()
15671567
defer shutdown()
1568-
1568+
t1 := makeTopic()
1569+
createTopic(t, t1, 1)
1570+
defer deleteTopic(t, t1)
1571+
t2 := makeTopic()
1572+
createTopic(t, t2, 1)
1573+
defer deleteTopic(t, t2)
15691574
conf := ReaderConfig{
15701575
Brokers: []string{"localhost:9092"},
15711576
GroupID: makeGroupID(),
1572-
GroupTopics: []string{makeTopic(), makeTopic()},
1577+
GroupTopics: []string{t1, t2},
15731578
MaxWait: time.Second,
15741579
PartitionWatchInterval: 100 * time.Millisecond,
15751580
WatchPartitionChanges: true,

scripts/wait-for-kafka.sh

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#/bin/bash
2+
3+
COUNTER=0;
4+
echo foo | nc localhost 9092
5+
STATUS=$?
6+
ATTEMPTS=60
7+
until [ ${STATUS} -eq 0 ] || [ "$COUNTER" -ge "${ATTEMPTS}" ];
8+
do
9+
let COUNTER=$COUNTER+1;
10+
sleep 1;
11+
echo "[$COUNTER] waiting for 9092 port to be open";
12+
echo foo | nc localhost 9092
13+
STATUS=$?
14+
done
15+
16+
if [ "${COUNTER}" -gt "${ATTEMPTS}" ];
17+
then
18+
echo "Kafka is not running, failing"
19+
exit 1
20+
fi

txnoffsetcommit_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ func TestClientTxnOffsetCommit(t *testing.T) {
2121

2222
client, shutdown := newLocalClientWithTopic(topic, 1)
2323
defer shutdown()
24+
waitForTopic(context.TODO(), t, topic)
25+
defer deleteTopic(t, topic)
2426

2527
now := time.Now()
2628

0 commit comments

Comments
 (0)