| /* |
| * |
| * Copyright 2024 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package xds_test |
| |
| import ( |
| "context" |
| "fmt" |
| "io" |
| "net" |
| "strconv" |
| "strings" |
| "sync" |
| "testing" |
| "time" |
| |
| "github.com/google/go-cmp/cmp" |
| "github.com/google/uuid" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/credentials/insecure" |
| "google.golang.org/grpc/internal/grpctest" |
| "google.golang.org/grpc/internal/stubserver" |
| "google.golang.org/grpc/internal/testutils" |
| "google.golang.org/grpc/internal/testutils/xds/e2e" |
| "google.golang.org/grpc/internal/xds/bootstrap" |
| "google.golang.org/grpc/internal/xds/xdsclient" |
| "google.golang.org/grpc/peer" |
| "google.golang.org/grpc/status" |
| "google.golang.org/grpc/xds" |
| |
| v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" |
| v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" |
| v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
| testgrpc "google.golang.org/grpc/interop/grpc_testing" |
| testpb "google.golang.org/grpc/interop/grpc_testing" |
| ) |
| |
| type s struct { |
| grpctest.Tester |
| } |
| |
| func Test(t *testing.T) { |
| grpctest.RunSubTests(t, s{}) |
| } |
| |
| const ( |
| defaultTestTimeout = 10 * time.Second |
| defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. |
| ) |
| |
| func hostPortFromListener(lis net.Listener) (string, uint32, error) { |
| host, p, err := net.SplitHostPort(lis.Addr().String()) |
| if err != nil { |
| return "", 0, fmt.Errorf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err) |
| } |
| port, err := strconv.ParseInt(p, 10, 32) |
| if err != nil { |
| return "", 0, fmt.Errorf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err) |
| } |
| return host, uint32(port), nil |
| } |
| |
| // servingModeChangeHandler handles changes to the serving mode of an |
| // xDS-enabled gRPC server. It logs the changes and sends the new mode and any |
| // errors on appropriate channels for the test to consume. |
| type servingModeChangeHandler struct { |
| logger interface { |
| Logf(format string, args ...any) |
| } |
| // Access to the below fields are guarded by this mutex. |
| mu sync.Mutex |
| modeCh chan connectivity.ServingMode |
| errCh chan error |
| currentMode connectivity.ServingMode |
| currentErr error |
| } |
| |
| func newServingModeChangeHandler(t *testing.T) *servingModeChangeHandler { |
| return &servingModeChangeHandler{ |
| logger: t, |
| modeCh: make(chan connectivity.ServingMode, 1), |
| errCh: make(chan error, 1), |
| } |
| } |
| |
| func (m *servingModeChangeHandler) modeChangeCallback(addr net.Addr, args xds.ServingModeChangeArgs) { |
| m.mu.Lock() |
| defer m.mu.Unlock() |
| // Suppress pushing duplicate mode change and error if the mode is staying |
| // in NOT_SERVING and the error is the same. |
| // |
| // TODO(purnesh42h): Should we move this check to listener wrapper? This |
| // shouldn't happen in practice a lot. But we never know what kind of |
| // management servers users run. |
| if m.currentMode == args.Mode && m.currentMode == connectivity.ServingModeNotServing && m.currentErr.Error() == args.Err.Error() { |
| return |
| } |
| m.logger.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) |
| m.modeCh <- args.Mode |
| m.currentMode = args.Mode |
| if args.Err != nil { |
| m.errCh <- args.Err |
| } |
| m.currentErr = args.Err |
| } |
| |
| // createStubServer creates a new xDS-enabled gRPC server and returns a |
| // stubserver.StubServer that can be used for testing. The server is configured |
| // with the provided modeChangeOpt and xdsclient.Pool. |
| func createStubServer(t *testing.T, lis net.Listener, opts ...grpc.ServerOption) *stubserver.StubServer { |
| stub := &stubserver.StubServer{ |
| Listener: lis, |
| EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { |
| return &testpb.Empty{}, nil |
| }, |
| FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| for { |
| if _, err := stream.Recv(); err == io.EOF { |
| return nil |
| } else if err != nil { |
| return err |
| } |
| } |
| }, |
| } |
| server, err := xds.NewGRPCServer(opts...) |
| if err != nil { |
| t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) |
| } |
| stub.S = server |
| stubserver.StartTestService(t, stub) |
| t.Cleanup(stub.Stop) |
| return stub |
| } |
| |
| // waitForSuccessfulRPC waits for an RPC to succeed, repeatedly calling the |
| // EmptyCall RPC on the provided client connection until the call succeeds. |
| // If the context is canceled or the expected error is not before the context |
| // timeout expires, the test will fail. |
| func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn, opts ...grpc.CallOption) { |
| t.Helper() |
| |
| client := testgrpc.NewTestServiceClient(cc) |
| for { |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("Timeout waiting for RPCs to succeed") |
| case <-time.After(defaultTestShortTimeout): |
| if _, err := client.EmptyCall(ctx, &testpb.Empty{}, opts...); err == nil { |
| return |
| } |
| } |
| } |
| } |
| |
| // waitForFailedRPCWithStatus waits for an RPC to fail with the expected status |
| // code, error message, and node ID. It repeatedly calls the EmptyCall RPC on |
| // the provided client connection until the error matches the expected values. |
| // If the context is canceled or the expected error is not before the context |
| // timeout expires, the test will fail. |
| func waitForFailedRPCWithStatus(ctx context.Context, t *testing.T, cc *grpc.ClientConn, wantCode codes.Code, wantErr, wantNodeID string) { |
| t.Helper() |
| |
| client := testgrpc.NewTestServiceClient(cc) |
| var err error |
| for { |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("RPCs failed with most recent error: %v. Want status code %v, error: %s, node id: %s", err, wantCode, wantErr, wantNodeID) |
| case <-time.After(defaultTestShortTimeout): |
| _, err = client.EmptyCall(ctx, &testpb.Empty{}) |
| if gotCode := status.Code(err); gotCode != wantCode { |
| continue |
| } |
| if gotErr := err.Error(); !strings.Contains(gotErr, wantErr) { |
| continue |
| } |
| if !strings.Contains(err.Error(), wantNodeID) { |
| continue |
| } |
| t.Logf("Most recent error happy case: %v", err.Error()) |
| return |
| } |
| } |
| } |
| |
| // Tests the basic scenario for an xDS enabled gRPC server. |
| // |
| // - Verifies that the xDS enabled gRPC server requests for the expected |
| // listener resource. |
| // - Once the listener resource is received from the management server, it |
| // verifies that the xDS enabled gRPC server requests for the appropriate |
| // route configuration name. Also verifies that at this point, the server has |
| // not yet started serving RPCs |
| // - Once the route configuration is received from the management server, it |
| // verifies that the server can serve RPCs successfully. |
| func (s) TestServer_Basic(t *testing.T) { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| |
| // Start an xDS management server. |
| listenerNamesCh := make(chan []string, 1) |
| routeNamesCh := make(chan []string, 1) |
| managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ |
| OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { |
| // The test needs to only verify the names of resources being |
| // subscribed to, we can skip ACKs. |
| if req.GetVersionInfo() != "" { |
| return nil |
| } |
| switch req.GetTypeUrl() { |
| case "type.googleapis.com/envoy.config.listener.v3.Listener": |
| select { |
| case listenerNamesCh <- req.GetResourceNames(): |
| case <-ctx.Done(): |
| } |
| case "type.googleapis.com/envoy.config.route.v3.RouteConfiguration": |
| select { |
| case routeNamesCh <- req.GetResourceNames(): |
| case <-ctx.Done(): |
| } |
| } |
| return nil |
| }, |
| AllowResourceSubset: true, |
| }) |
| |
| // Create bootstrap configuration pointing to the above management server. |
| nodeID := uuid.New().String() |
| bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address) |
| |
| // Create a listener on a local port to act as the xDS enabled gRPC server. |
| lis, err := testutils.LocalTCPListener() |
| if err != nil { |
| t.Fatalf("Failed to listen to local port: %v", err) |
| } |
| host, port, err := hostPortFromListener(lis) |
| if err != nil { |
| t.Fatalf("Failed to retrieve host and port of server: %v", err) |
| } |
| |
| // Configure the managegement server with a listener resource for the above |
| // xDS enabled gRPC server. |
| const routeConfigName = "routeName" |
| resources := e2e.UpdateOptions{ |
| NodeID: nodeID, |
| Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")}, |
| SkipValidation: true, |
| } |
| if err := managementServer.Update(ctx, resources); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Start an xDS-enabled gRPC server with the above bootstrap configuration. |
| config, err := bootstrap.NewConfigFromContents(bootstrapContents) |
| if err != nil { |
| t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err) |
| } |
| pool := xdsclient.NewPool(config) |
| modeChangeHandler := newServingModeChangeHandler(t) |
| modeChangeOpt := xds.ServingModeCallback(modeChangeHandler.modeChangeCallback) |
| createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool)) |
| |
| // Wait for the expected listener resource to be requested. |
| wantLisResourceNames := []string{fmt.Sprintf(e2e.ServerListenerResourceNameTemplate, net.JoinHostPort(host, strconv.Itoa(int(port))))} |
| select { |
| case <-ctx.Done(): |
| t.Fatal("Timeout waiting for the expected listener resource to be requested") |
| case gotLisResourceName := <-listenerNamesCh: |
| if !cmp.Equal(gotLisResourceName, wantLisResourceNames) { |
| t.Fatalf("Got unexpected listener resource names: %v, want %v", gotLisResourceName, wantLisResourceNames) |
| } |
| } |
| |
| // Wait for the expected route config resource to be requested. |
| select { |
| case <-ctx.Done(): |
| t.Fatal("Timeout waiting for the expected route config resource to be requested") |
| case gotRouteNames := <-routeNamesCh: |
| if !cmp.Equal(gotRouteNames, []string{routeConfigName}) { |
| t.Fatalf("Got unexpected route config resource names: %v, want %v", gotRouteNames, []string{routeConfigName}) |
| } |
| } |
| |
| // Ensure that the server is not serving RPCs yet. |
| sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) |
| defer sCancel() |
| select { |
| case <-sCtx.Done(): |
| case <-modeChangeHandler.modeCh: |
| t.Fatal("Server started serving RPCs before the route config was received") |
| } |
| |
| // Create a gRPC channel to the xDS enabled server. |
| cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) |
| if err != nil { |
| t.Fatalf("grpc.NewClient(%q) failed: %v", lis.Addr(), err) |
| } |
| defer cc.Close() |
| |
| // Ensure that the server isnt't serving RPCs successfully. |
| client := testgrpc.NewTestServiceClient(cc) |
| if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.Unavailable { |
| t.Fatalf("EmptyCall() returned %v, want %v", err, codes.Unavailable) |
| } |
| |
| // Configure the management server with the expected route config resource, |
| // and expext RPCs to succeed. |
| resources.Routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigNonForwardingAction(routeConfigName)} |
| if err := managementServer.Update(ctx, resources); err != nil { |
| t.Fatal(err) |
| } |
| select { |
| case <-ctx.Done(): |
| t.Fatal("Timeout waiting for the server to start serving RPCs") |
| case gotMode := <-modeChangeHandler.modeCh: |
| if gotMode != connectivity.ServingModeServing { |
| t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing) |
| } |
| } |
| waitForSuccessfulRPC(ctx, t, cc) |
| } |
| |
| // Tests that the xDS-enabled gRPC server cleans up all its resources when all |
| // connections to it are closed. |
| func (s) TestServer_ConnectionCleanup(t *testing.T) { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| |
| // Start an xDS management server. |
| managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) |
| |
| // Create bootstrap configuration pointing to the above management server. |
| nodeID := uuid.New().String() |
| bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address) |
| |
| // Create a listener on a local port to act as the xDS enabled gRPC server. |
| lis, err := testutils.LocalTCPListener() |
| if err != nil { |
| t.Fatalf("Failed to listen to local port: %v", err) |
| } |
| host, port, err := hostPortFromListener(lis) |
| if err != nil { |
| t.Fatalf("Failed to retrieve host and port of server: %v", err) |
| } |
| |
| // Configure the managegement server with a listener and route configuration |
| // resource for the above xDS enabled gRPC server. |
| const routeConfigName = "routeName" |
| resources := e2e.UpdateOptions{ |
| NodeID: nodeID, |
| Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")}, |
| Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigNonForwardingAction(routeConfigName)}, |
| SkipValidation: true, |
| } |
| if err := managementServer.Update(ctx, resources); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Start an xDS-enabled gRPC server with the above bootstrap configuration. |
| config, err := bootstrap.NewConfigFromContents(bootstrapContents) |
| if err != nil { |
| t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err) |
| } |
| pool := xdsclient.NewPool(config) |
| modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { |
| t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) |
| }) |
| createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool)) |
| |
| // Create a gRPC channel and verify that RPCs succeed. |
| cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) |
| if err != nil { |
| t.Fatalf("grpc.NewClient(%q) failed: %v", lis.Addr(), err) |
| } |
| defer cc.Close() |
| waitForSuccessfulRPC(ctx, t, cc) |
| |
| // Create multiple channels to the server, and make an RPC on each one. When |
| // everything is closed, the server should have cleaned up all its resources |
| // as well (and this will be verified by the leakchecker). |
| const numConns = 100 |
| var wg sync.WaitGroup |
| wg.Add(numConns) |
| for range numConns { |
| go func() { |
| defer wg.Done() |
| cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) |
| if err != nil { |
| t.Errorf("grpc.NewClient failed with err: %v", err) |
| } |
| client := testgrpc.NewTestServiceClient(cc) |
| if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { |
| t.Errorf("EmptyCall() failed: %v", err) |
| } |
| cc.Close() |
| }() |
| } |
| wg.Wait() |
| } |
| |
| // Tests that multiple xDS-enabled gRPC servers can be created with different |
| // bootstrap configurations, and that they correctly request different LDS |
| // resources from the management server based on their respective listening |
| // ports. It also ensures that gRPC clients can connect to the intended server |
| // and that RPCs function correctly. The test uses the grpc.Peer() call option |
| // to validate that the client is connected to the correct server. |
| func (s) TestServer_MultipleServers_DifferentBootstrapConfigurations(t *testing.T) { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) |
| |
| // Create two bootstrap configurations pointing to the above management server. |
| nodeID1 := uuid.New().String() |
| bootstrapContents1 := e2e.DefaultBootstrapContents(t, nodeID1, mgmtServer.Address) |
| nodeID2 := uuid.New().String() |
| bootstrapContents2 := e2e.DefaultBootstrapContents(t, nodeID2, mgmtServer.Address) |
| |
| // Create two xDS-enabled gRPC servers using the above bootstrap configs. |
| lis1, err := testutils.LocalTCPListener() |
| if err != nil { |
| t.Fatalf("testutils.LocalTCPListener() failed: %v", err) |
| } |
| lis2, err := testutils.LocalTCPListener() |
| if err != nil { |
| t.Fatalf("testutils.LocalTCPListener() failed: %v", err) |
| } |
| |
| modeChangeHandler1 := newServingModeChangeHandler(t) |
| modeChangeOpt1 := xds.ServingModeCallback(modeChangeHandler1.modeChangeCallback) |
| modeChangeHandler2 := newServingModeChangeHandler(t) |
| modeChangeOpt2 := xds.ServingModeCallback(modeChangeHandler2.modeChangeCallback) |
| config1, err := bootstrap.NewConfigFromContents(bootstrapContents1) |
| if err != nil { |
| t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents1), err) |
| } |
| pool1 := xdsclient.NewPool(config1) |
| config2, err := bootstrap.NewConfigFromContents(bootstrapContents2) |
| if err != nil { |
| t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents2), err) |
| } |
| pool2 := xdsclient.NewPool(config2) |
| createStubServer(t, lis1, modeChangeOpt1, xds.ClientPoolForTesting(pool1)) |
| createStubServer(t, lis2, modeChangeOpt2, xds.ClientPoolForTesting(pool2)) |
| |
| // Update the management server with the listener resources pointing to the |
| // corresponding gRPC servers. |
| host1, port1, err := hostPortFromListener(lis1) |
| if err != nil { |
| t.Fatalf("Failed to retrieve host and port of server: %v", err) |
| } |
| host2, port2, err := hostPortFromListener(lis2) |
| if err != nil { |
| t.Fatalf("Failed to retrieve host and port of server: %v", err) |
| } |
| |
| resources1 := e2e.UpdateOptions{ |
| NodeID: nodeID1, |
| Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host1, port1, e2e.SecurityLevelNone, "routeName")}, |
| } |
| if err := mgmtServer.Update(ctx, resources1); err != nil { |
| t.Fatal(err) |
| } |
| |
| resources2 := e2e.UpdateOptions{ |
| NodeID: nodeID2, |
| Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host2, port2, e2e.SecurityLevelNone, "routeName")}, |
| } |
| if err := mgmtServer.Update(ctx, resources2); err != nil { |
| t.Fatal(err) |
| } |
| |
| select { |
| case <-ctx.Done(): |
| t.Fatal("Timeout waiting for the xDS-enabled gRPC server to go SERVING") |
| case gotMode := <-modeChangeHandler1.modeCh: |
| if gotMode != connectivity.ServingModeServing { |
| t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing) |
| } |
| } |
| select { |
| case <-ctx.Done(): |
| t.Fatal("Timeout waiting for the xDS-enabled gRPC server to go SERVING") |
| case gotMode := <-modeChangeHandler2.modeCh: |
| if gotMode != connectivity.ServingModeServing { |
| t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing) |
| } |
| } |
| |
| // Create two gRPC clients, one for each server. |
| cc1, err := grpc.NewClient(lis1.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) |
| if err != nil { |
| t.Fatalf("Failed to create client for test server 1: %s, %v", lis1.Addr().String(), err) |
| } |
| defer cc1.Close() |
| |
| cc2, err := grpc.NewClient(lis2.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) |
| if err != nil { |
| t.Fatalf("Failed to create client for test server 2: %s, %v", lis2.Addr().String(), err) |
| } |
| defer cc2.Close() |
| |
| // Both unary RPCs should work once the servers transitions into serving. |
| var peer1 peer.Peer |
| waitForSuccessfulRPC(ctx, t, cc1, grpc.Peer(&peer1)) |
| if peer1.Addr.String() != lis1.Addr().String() { |
| t.Errorf("Connected to wrong peer: %s, want %s", peer1.Addr, lis1.Addr()) |
| } |
| |
| var peer2 peer.Peer |
| waitForSuccessfulRPC(ctx, t, cc2, grpc.Peer(&peer2)) |
| if peer2.Addr.String() != lis2.Addr().String() { |
| t.Errorf("Connected to wrong peer: %s, want %s", peer2.Addr, lis2.Addr()) |
| } |
| } |