balancer/randomsubsetting: Implementation of the random_subsetting LB policy (#8650)

Implements [gRFC
A68](https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md).

Note that this PR only implements the LB policy and does not implement
the xDS integration specified here:
https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md#xds-integration

RELEASE NOTES:
- balancer/randomsubsetting: Implementation of the `random_subsetting`
LB policy

---------

Signed-off-by: marek-szews <[email protected]>
Co-authored-by: Easwar Swaminathan <[email protected]>
diff --git a/balancer/randomsubsetting/randomsubsetting.go b/balancer/randomsubsetting/randomsubsetting.go
new file mode 100644
index 0000000..d8e875b
--- /dev/null
+++ b/balancer/randomsubsetting/randomsubsetting.go
@@ -0,0 +1,192 @@
+/*
+ *
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package randomsubsetting implements the random_subsetting LB policy specified
+// here: https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md
+//
+// To install the LB policy, import this package as:
+//
+//	import _ "google.golang.org/grpc/balancer/randomsubsetting"
+//
+// # Experimental
+//
+// Notice: This package is EXPERIMENTAL and may be changed or removed in a
+// later release.
+package randomsubsetting
+
+import (
+	"cmp"
+	"encoding/json"
+	"fmt"
+	"math/rand/v2"
+	"slices"
+
+	xxhash "github.com/cespare/xxhash/v2"
+	"google.golang.org/grpc/balancer"
+	"google.golang.org/grpc/grpclog"
+	"google.golang.org/grpc/internal/balancer/gracefulswitch"
+	internalgrpclog "google.golang.org/grpc/internal/grpclog"
+	iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
+	"google.golang.org/grpc/resolver"
+	"google.golang.org/grpc/serviceconfig"
+)
+
+// Name is the name of the random subsetting load balancer.
+const Name = "random_subsetting_experimental"
+
+var (
+	logger     = grpclog.Component(Name)
+	randUint64 = rand.Uint64
+)
+
+func prefixLogger(p *subsettingBalancer) *internalgrpclog.PrefixLogger {
+	return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[random-subsetting-lb %p] ", p))
+}
+
+func init() {
+	balancer.Register(bb{})
+}
+
+type bb struct{}
+
+func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
+	b := &subsettingBalancer{
+		Balancer:   gracefulswitch.NewBalancer(cc, bOpts),
+		hashSeed:   randUint64(),
+		hashDigest: xxhash.New(),
+	}
+	b.logger = prefixLogger(b)
+	b.logger.Infof("Created")
+	return b
+}
+
+type lbConfig struct {
+	serviceconfig.LoadBalancingConfig `json:"-"`
+
+	SubsetSize  uint32                         `json:"subsetSize,omitempty"`
+	ChildPolicy *iserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
+}
+
+func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
+	lbCfg := &lbConfig{}
+
+	// Ensure that the specified child policy is registered and validates its
+	// config, if present.
+	if err := json.Unmarshal(s, lbCfg); err != nil {
+		return nil, fmt.Errorf("randomsubsetting: json.Unmarshal failed for configuration: %s with error: %v", string(s), err)
+	}
+	if lbCfg.SubsetSize == 0 {
+		return nil, fmt.Errorf("randomsubsetting: SubsetSize must be greater than 0")
+	}
+	if lbCfg.ChildPolicy == nil {
+		return nil, fmt.Errorf("randomsubsetting: ChildPolicy must be specified")
+	}
+
+	return lbCfg, nil
+}
+
+func (bb) Name() string {
+	return Name
+}
+
+type subsettingBalancer struct {
+	*gracefulswitch.Balancer
+
+	logger     *internalgrpclog.PrefixLogger
+	cfg        *lbConfig
+	hashSeed   uint64
+	hashDigest *xxhash.Digest
+}
+
+func (b *subsettingBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
+	lbCfg, ok := s.BalancerConfig.(*lbConfig)
+	if !ok {
+		b.logger.Warningf("Received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
+		return balancer.ErrBadResolverState
+	}
+
+	// Build config for the gracefulswitch balancer. It is safe to ignore
+	// JSON marshaling errors here, since the config was already validated
+	// as part of ParseConfig().
+	cfg := []map[string]any{{lbCfg.ChildPolicy.Name: lbCfg.ChildPolicy.Config}}
+	cfgJSON, _ := json.Marshal(cfg)
+	parsedCfg, err := gracefulswitch.ParseConfig(cfgJSON)
+	if err != nil {
+		return fmt.Errorf("randomsubsetting: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err)
+	}
+	b.cfg = lbCfg
+	endpoints := resolver.State{
+		Endpoints:     b.calculateSubset(s.ResolverState.Endpoints),
+		ServiceConfig: s.ResolverState.ServiceConfig,
+		Attributes:    s.ResolverState.Attributes,
+	}
+
+	return b.Balancer.UpdateClientConnState(balancer.ClientConnState{
+		ResolverState:  endpoints,
+		BalancerConfig: parsedCfg,
+	})
+}
+
+// calculateSubset implements the subsetting algorithm, as described in A68:
+// https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md#subsetting-algorithm
+func (b *subsettingBalancer) calculateSubset(endpoints []resolver.Endpoint) []resolver.Endpoint {
+	// A helper struct to hold an endpoint and its hash.
+	type endpointWithHash struct {
+		hash uint64
+		ep   resolver.Endpoint
+	}
+
+	subsetSize := b.cfg.SubsetSize
+	if len(endpoints) <= int(subsetSize) {
+		return endpoints
+	}
+
+	hashedEndpoints := make([]endpointWithHash, len(endpoints))
+	for i, endpoint := range endpoints {
+		// For every endpoint in the list, compute a hash with previously
+		// generated seed - A68.
+		//
+		// The xxhash package's Sum64() function does not allow setting a seed.
+		// This means that we need to reset the digest with the seed for every
+		// endpoint. Without this, an endpoint will not retain the same hash
+		// across resolver updates.
+		//
+		// Note that we only hash the first address of the endpoint, as per A68.
+		b.hashDigest.ResetWithSeed(b.hashSeed)
+		b.hashDigest.WriteString(endpoint.Addresses[0].String())
+		hashedEndpoints[i] = endpointWithHash{
+			hash: b.hashDigest.Sum64(),
+			ep:   endpoint,
+		}
+	}
+
+	slices.SortFunc(hashedEndpoints, func(a, b endpointWithHash) int {
+		// Note: This uses the standard library cmp package, not the
+		// github.com/google/go-cmp/cmp package. The latter is intended for
+		// testing purposes only.
+		return cmp.Compare(a.hash, b.hash)
+	})
+
+	// Convert back to resolver.Endpoints
+	endpointSubset := make([]resolver.Endpoint, subsetSize)
+	for i, endpoint := range hashedEndpoints[:subsetSize] {
+		endpointSubset[i] = endpoint.ep
+	}
+
+	return endpointSubset
+}
diff --git a/balancer/randomsubsetting/randomsubsetting_test.go b/balancer/randomsubsetting/randomsubsetting_test.go
new file mode 100644
index 0000000..a23f1ab
--- /dev/null
+++ b/balancer/randomsubsetting/randomsubsetting_test.go
@@ -0,0 +1,261 @@
+/*
+ *
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package randomsubsetting
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"strings"
+	"testing"
+	"time"
+
+	xxhash "github.com/cespare/xxhash/v2"
+	"github.com/google/go-cmp/cmp"
+	"google.golang.org/grpc/balancer"
+	"google.golang.org/grpc/internal/balancer/stub"
+	"google.golang.org/grpc/internal/grpctest"
+	iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
+	"google.golang.org/grpc/internal/testutils"
+	"google.golang.org/grpc/resolver"
+	"google.golang.org/grpc/serviceconfig"
+
+	_ "google.golang.org/grpc/balancer/roundrobin" // For round_robin LB policy in tests
+)
+
+var defaultTestTimeout = 5 * time.Second
+
+type s struct {
+	grpctest.Tester
+}
+
+func Test(t *testing.T) {
+	grpctest.RunSubTests(t, s{})
+}
+
+func (s) TestParseConfig(t *testing.T) {
+	parser := bb{}
+	tests := []struct {
+		name    string
+		input   string
+		wantCfg serviceconfig.LoadBalancingConfig
+		wantErr string
+	}{
+		{
+			name:    "invalid_json",
+			input:   "{{invalidjson{{",
+			wantErr: "json.Unmarshal failed for configuration",
+		},
+		{
+			name:    "empty_config",
+			input:   `{}`,
+			wantErr: "SubsetSize must be greater than 0",
+		},
+		{
+			name:    "subset_size_zero",
+			input:   `{ "subsetSize": 0 }`,
+			wantErr: "SubsetSize must be greater than 0",
+		},
+		{
+			name:    "child_policy_missing",
+			input:   `{ "subsetSize": 1 }`,
+			wantErr: "ChildPolicy must be specified",
+		},
+		{
+			name:    "child_policy_not_registered",
+			input:   `{ "subsetSize": 1 , "childPolicy": [{"unregistered_lb": {}}] }`,
+			wantErr: "no supported policies found",
+		},
+		{
+			name:  "success",
+			input: `{ "subsetSize": 3, "childPolicy": [{"round_robin": {}}]}`,
+			wantCfg: &lbConfig{
+				SubsetSize:  3,
+				ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"},
+			},
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input))
+			// Substring match makes this very tightly coupled to the
+			// internalserviceconfig.BalancerConfig error strings. However, it
+			// is important to distinguish the different types of error messages
+			// possible as the parser has a few defined buckets of ways it can
+			// error out.
+			if (gotErr != nil) != (test.wantErr != "") {
+				t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
+			}
+			if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
+				t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
+			}
+			if test.wantErr != "" {
+				return
+			}
+			if diff := cmp.Diff(test.wantCfg, gotCfg); diff != "" {
+				t.Fatalf("ParseConfig(%s) got unexpected output, diff (-want +got):\n%s", test.input, diff)
+			}
+		})
+	}
+}
+
+func makeEndpoints(n int) []resolver.Endpoint {
+	endpoints := make([]resolver.Endpoint, n)
+	for i := 0; i < n; i++ {
+		endpoints[i] = resolver.Endpoint{
+			Addresses: []resolver.Address{{Addr: fmt.Sprintf("endpoint-%d", i)}},
+		}
+	}
+	return endpoints
+}
+
+func (s) TestCalculateSubset_Simple(t *testing.T) {
+	tests := []struct {
+		name       string
+		endpoints  []resolver.Endpoint
+		subsetSize uint32
+		want       []resolver.Endpoint
+	}{
+		{
+			name:       "NoEndpoints",
+			endpoints:  []resolver.Endpoint{},
+			subsetSize: 3,
+			want:       []resolver.Endpoint{},
+		},
+		{
+			name:       "SubsetSizeLargerThanNumberOfEndpoints",
+			endpoints:  makeEndpoints(5),
+			subsetSize: 10,
+			want:       makeEndpoints(5),
+		},
+		{
+			name:       "SubsetSizeEqualToNumberOfEndpoints",
+			endpoints:  makeEndpoints(5),
+			subsetSize: 5,
+			want:       makeEndpoints(5),
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			b := &subsettingBalancer{
+				cfg:        &lbConfig{SubsetSize: tt.subsetSize},
+				hashSeed:   0,
+				hashDigest: xxhash.New(),
+			}
+			got := b.calculateSubset(tt.endpoints)
+			if diff := cmp.Diff(tt.want, got); diff != "" {
+				t.Errorf("calculateSubset() returned diff (-want +got):\n%s", diff)
+			}
+		})
+	}
+}
+
+func (s) TestCalculateSubset_EndpointsRetainHashValues(t *testing.T) {
+	endpoints := makeEndpoints(10)
+	const subsetSize = 5
+	// The subset is deterministic based on the hash, so we can hardcode
+	// the expected output.
+	want := []resolver.Endpoint{
+		{Addresses: []resolver.Address{{Addr: "endpoint-6"}}},
+		{Addresses: []resolver.Address{{Addr: "endpoint-0"}}},
+		{Addresses: []resolver.Address{{Addr: "endpoint-1"}}},
+		{Addresses: []resolver.Address{{Addr: "endpoint-7"}}},
+		{Addresses: []resolver.Address{{Addr: "endpoint-3"}}},
+	}
+
+	b := &subsettingBalancer{
+		cfg:        &lbConfig{SubsetSize: subsetSize},
+		hashSeed:   0,
+		hashDigest: xxhash.New(),
+	}
+	for range 10 {
+		got := b.calculateSubset(endpoints)
+		if diff := cmp.Diff(want, got); diff != "" {
+			t.Fatalf("calculateSubset() returned diff (-want +got):\n%s", diff)
+		}
+	}
+}
+
+func (s) TestSubsettingBalancer_DeterministicSubset(t *testing.T) {
+	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+	defer cancel()
+
+	// Register the stub balancer builder, which will be used as the child
+	// policy in the random_subsetting balancer.
+	updateCh := make(chan balancer.ClientConnState, 1)
+	stub.Register("stub-child-balancer", stub.BalancerFuncs{
+		UpdateClientConnState: func(_ *stub.BalancerData, s balancer.ClientConnState) error {
+			select {
+			case <-ctx.Done():
+			case updateCh <- s:
+			}
+			return nil
+		},
+	})
+
+	// Create a random_subsetting balancer.
+	tcc := testutils.NewBalancerClientConn(t)
+	rsb := balancer.Get(Name).Build(tcc, balancer.BuildOptions{})
+	defer rsb.Close()
+
+	// Prepare the configuration and resolver state to be passed to the
+	// random_subsetting balancer.
+	endpoints := makeEndpoints(10)
+	state := balancer.ClientConnState{
+		ResolverState: resolver.State{Endpoints: endpoints},
+		BalancerConfig: &lbConfig{
+			SubsetSize:  5,
+			ChildPolicy: &iserviceconfig.BalancerConfig{Name: "stub-child-balancer"},
+		},
+	}
+
+	// Send the resolver state to the random_subsetting balancer and verify that
+	// the child policy receives the expected number of endpoints.
+	if err := rsb.UpdateClientConnState(state); err != nil {
+		t.Fatalf("UpdateClientConnState failed: %v", err)
+	}
+
+	var wantEndpoints []resolver.Endpoint
+	select {
+	case s := <-updateCh:
+		if len(s.ResolverState.Endpoints) != 5 {
+			t.Fatalf("Child policy received %d endpoints, want 5", len(s.ResolverState.Endpoints))
+		}
+		// Store the subset for the next comparison.
+		wantEndpoints = s.ResolverState.Endpoints
+	case <-ctx.Done():
+		t.Fatal("Timed out waiting for child policy to receive an update")
+	}
+
+	// Call UpdateClientConnState again with the same configuration.
+	if err := rsb.UpdateClientConnState(state); err != nil {
+		t.Fatalf("Second UpdateClientConnState failed: %v", err)
+	}
+
+	// Verify that the child policy receives the same subset of endpoints.
+	select {
+	case s := <-updateCh:
+		if diff := cmp.Diff(wantEndpoints, s.ResolverState.Endpoints); diff != "" {
+			t.Fatalf("Child policy received a different subset of endpoints on second update, diff (-want +got):\n%s", diff)
+		}
+	case <-ctx.Done():
+		t.Fatal("Timed out waiting for second child policy update")
+	}
+}