Pubsub¶
package pubsub
import (
"context"
"fmt"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)
// pubsubContainer represents the pubsub container type used in the module
type pubsubContainer struct {
testcontainers.Container
URI string
}
// startContainer creates an instance of the pubsub container type
func startContainer(ctx context.Context) (*pubsubContainer, error) {
req := testcontainers.ContainerRequest{
Image: "gcr.io/google.com/cloudsdktool/cloud-sdk:367.0.0-emulators",
ExposedPorts: []string{"8085/tcp"},
WaitingFor: wait.ForLog("started"),
Cmd: []string{
"/bin/sh",
"-c",
"gcloud beta emulators pubsub start --host-port 0.0.0.0:8085",
},
}
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
if err != nil {
return nil, err
}
mappedPort, err := container.MappedPort(ctx, "8085")
if err != nil {
return nil, err
}
hostIP, err := container.Host(ctx)
if err != nil {
return nil, err
}
uri := fmt.Sprintf("%s:%s", hostIP, mappedPort.Port())
return &pubsubContainer{Container: container, URI: uri}, nil
}
package pubsub
import (
"context"
"testing"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func TestPubsub(t *testing.T) {
ctx := context.Background()
container, err := startContainer(ctx)
if err != nil {
t.Fatal(err)
}
// Clean up the container after the test is complete
t.Cleanup(func() {
if err := container.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})
conn, err := grpc.Dial(container.URI, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatal(err)
}
options := []option.ClientOption{option.WithGRPCConn(conn)}
client, err := pubsub.NewClient(ctx, "my-project-id", options...)
if err != nil {
t.Fatal(err)
}
defer client.Close()
topic, err := client.CreateTopic(ctx, "greetings")
if err != nil {
t.Fatal(err)
}
subscription, err := client.CreateSubscription(ctx, "subscription",
pubsub.SubscriptionConfig{Topic: topic})
if err != nil {
t.Fatal(err)
}
result := topic.Publish(ctx, &pubsub.Message{Data: []byte("Hello World")})
_, err = result.Get(ctx)
if err != nil {
t.Fatal(err)
}
// perform assertions
var data []byte
cctx, cancel := context.WithCancel(ctx)
err = subscription.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
data = m.Data
m.Ack()
defer cancel()
})
if err != nil {
t.Fatal(err)
}
if string(data) != "Hello World" {
t.Fatalf("Expected value %s. Got %s.", "Hello World", data)
}
}