balancer/randomsubsetting: Implementation of the random_subsetting LB policy (#8650)
Implements [gRFC
A68](https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md).
Note that this PR only implements the LB policy and does not implement
the xDS integration specified here:
https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md#xds-integration
RELEASE NOTES:
- balancer/randomsubsetting: Implementation of the `random_subsetting`
LB policy
---------
Signed-off-by: marek-szews <[email protected]>
Co-authored-by: Easwar Swaminathan <[email protected]>
diff --git a/balancer/randomsubsetting/randomsubsetting.go b/balancer/randomsubsetting/randomsubsetting.go
new file mode 100644
index 0000000..d8e875b
--- /dev/null
+++ b/balancer/randomsubsetting/randomsubsetting.go
@@ -0,0 +1,192 @@
+/*
+ *
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package randomsubsetting implements the random_subsetting LB policy specified
+// here: https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md
+//
+// To install the LB policy, import this package as:
+//
+// import _ "google.golang.org/grpc/balancer/randomsubsetting"
+//
+// # Experimental
+//
+// Notice: This package is EXPERIMENTAL and may be changed or removed in a
+// later release.
+package randomsubsetting
+
+import (
+ "cmp"
+ "encoding/json"
+ "fmt"
+ "math/rand/v2"
+ "slices"
+
+ xxhash "github.com/cespare/xxhash/v2"
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal/balancer/gracefulswitch"
+ internalgrpclog "google.golang.org/grpc/internal/grpclog"
+ iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
+ "google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/serviceconfig"
+)
+
+// Name is the name of the random subsetting load balancer.
+const Name = "random_subsetting_experimental"
+
+var (
+ logger = grpclog.Component(Name)
+ randUint64 = rand.Uint64
+)
+
+func prefixLogger(p *subsettingBalancer) *internalgrpclog.PrefixLogger {
+ return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[random-subsetting-lb %p] ", p))
+}
+
+func init() {
+ balancer.Register(bb{})
+}
+
+type bb struct{}
+
+func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
+ b := &subsettingBalancer{
+ Balancer: gracefulswitch.NewBalancer(cc, bOpts),
+ hashSeed: randUint64(),
+ hashDigest: xxhash.New(),
+ }
+ b.logger = prefixLogger(b)
+ b.logger.Infof("Created")
+ return b
+}
+
+type lbConfig struct {
+ serviceconfig.LoadBalancingConfig `json:"-"`
+
+ SubsetSize uint32 `json:"subsetSize,omitempty"`
+ ChildPolicy *iserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
+}
+
+func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
+ lbCfg := &lbConfig{}
+
+ // Ensure that the specified child policy is registered and validates its
+ // config, if present.
+ if err := json.Unmarshal(s, lbCfg); err != nil {
+ return nil, fmt.Errorf("randomsubsetting: json.Unmarshal failed for configuration: %s with error: %v", string(s), err)
+ }
+ if lbCfg.SubsetSize == 0 {
+ return nil, fmt.Errorf("randomsubsetting: SubsetSize must be greater than 0")
+ }
+ if lbCfg.ChildPolicy == nil {
+ return nil, fmt.Errorf("randomsubsetting: ChildPolicy must be specified")
+ }
+
+ return lbCfg, nil
+}
+
+func (bb) Name() string {
+ return Name
+}
+
+type subsettingBalancer struct {
+ *gracefulswitch.Balancer
+
+ logger *internalgrpclog.PrefixLogger
+ cfg *lbConfig
+ hashSeed uint64
+ hashDigest *xxhash.Digest
+}
+
+func (b *subsettingBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
+ lbCfg, ok := s.BalancerConfig.(*lbConfig)
+ if !ok {
+ b.logger.Warningf("Received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
+ return balancer.ErrBadResolverState
+ }
+
+ // Build config for the gracefulswitch balancer. It is safe to ignore
+ // JSON marshaling errors here, since the config was already validated
+ // as part of ParseConfig().
+ cfg := []map[string]any{{lbCfg.ChildPolicy.Name: lbCfg.ChildPolicy.Config}}
+ cfgJSON, _ := json.Marshal(cfg)
+ parsedCfg, err := gracefulswitch.ParseConfig(cfgJSON)
+ if err != nil {
+ return fmt.Errorf("randomsubsetting: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err)
+ }
+ b.cfg = lbCfg
+ endpoints := resolver.State{
+ Endpoints: b.calculateSubset(s.ResolverState.Endpoints),
+ ServiceConfig: s.ResolverState.ServiceConfig,
+ Attributes: s.ResolverState.Attributes,
+ }
+
+ return b.Balancer.UpdateClientConnState(balancer.ClientConnState{
+ ResolverState: endpoints,
+ BalancerConfig: parsedCfg,
+ })
+}
+
+// calculateSubset implements the subsetting algorithm, as described in A68:
+// https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md#subsetting-algorithm
+func (b *subsettingBalancer) calculateSubset(endpoints []resolver.Endpoint) []resolver.Endpoint {
+ // A helper struct to hold an endpoint and its hash.
+ type endpointWithHash struct {
+ hash uint64
+ ep resolver.Endpoint
+ }
+
+ subsetSize := b.cfg.SubsetSize
+ if len(endpoints) <= int(subsetSize) {
+ return endpoints
+ }
+
+ hashedEndpoints := make([]endpointWithHash, len(endpoints))
+ for i, endpoint := range endpoints {
+ // For every endpoint in the list, compute a hash with previously
+ // generated seed - A68.
+ //
+ // The xxhash package's Sum64() function does not allow setting a seed.
+ // This means that we need to reset the digest with the seed for every
+ // endpoint. Without this, an endpoint will not retain the same hash
+ // across resolver updates.
+ //
+ // Note that we only hash the first address of the endpoint, as per A68.
+ b.hashDigest.ResetWithSeed(b.hashSeed)
+ b.hashDigest.WriteString(endpoint.Addresses[0].String())
+ hashedEndpoints[i] = endpointWithHash{
+ hash: b.hashDigest.Sum64(),
+ ep: endpoint,
+ }
+ }
+
+ slices.SortFunc(hashedEndpoints, func(a, b endpointWithHash) int {
+ // Note: This uses the standard library cmp package, not the
+ // github.com/google/go-cmp/cmp package. The latter is intended for
+ // testing purposes only.
+ return cmp.Compare(a.hash, b.hash)
+ })
+
+ // Convert back to resolver.Endpoints
+ endpointSubset := make([]resolver.Endpoint, subsetSize)
+ for i, endpoint := range hashedEndpoints[:subsetSize] {
+ endpointSubset[i] = endpoint.ep
+ }
+
+ return endpointSubset
+}
diff --git a/balancer/randomsubsetting/randomsubsetting_test.go b/balancer/randomsubsetting/randomsubsetting_test.go
new file mode 100644
index 0000000..a23f1ab
--- /dev/null
+++ b/balancer/randomsubsetting/randomsubsetting_test.go
@@ -0,0 +1,261 @@
+/*
+ *
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package randomsubsetting
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+
+ xxhash "github.com/cespare/xxhash/v2"
+ "github.com/google/go-cmp/cmp"
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/internal/balancer/stub"
+ "google.golang.org/grpc/internal/grpctest"
+ iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
+ "google.golang.org/grpc/internal/testutils"
+ "google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/serviceconfig"
+
+ _ "google.golang.org/grpc/balancer/roundrobin" // For round_robin LB policy in tests
+)
+
+var defaultTestTimeout = 5 * time.Second
+
+type s struct {
+ grpctest.Tester
+}
+
+func Test(t *testing.T) {
+ grpctest.RunSubTests(t, s{})
+}
+
+func (s) TestParseConfig(t *testing.T) {
+ parser := bb{}
+ tests := []struct {
+ name string
+ input string
+ wantCfg serviceconfig.LoadBalancingConfig
+ wantErr string
+ }{
+ {
+ name: "invalid_json",
+ input: "{{invalidjson{{",
+ wantErr: "json.Unmarshal failed for configuration",
+ },
+ {
+ name: "empty_config",
+ input: `{}`,
+ wantErr: "SubsetSize must be greater than 0",
+ },
+ {
+ name: "subset_size_zero",
+ input: `{ "subsetSize": 0 }`,
+ wantErr: "SubsetSize must be greater than 0",
+ },
+ {
+ name: "child_policy_missing",
+ input: `{ "subsetSize": 1 }`,
+ wantErr: "ChildPolicy must be specified",
+ },
+ {
+ name: "child_policy_not_registered",
+ input: `{ "subsetSize": 1 , "childPolicy": [{"unregistered_lb": {}}] }`,
+ wantErr: "no supported policies found",
+ },
+ {
+ name: "success",
+ input: `{ "subsetSize": 3, "childPolicy": [{"round_robin": {}}]}`,
+ wantCfg: &lbConfig{
+ SubsetSize: 3,
+ ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"},
+ },
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input))
+ // Substring match makes this very tightly coupled to the
+ // internalserviceconfig.BalancerConfig error strings. However, it
+ // is important to distinguish the different types of error messages
+ // possible as the parser has a few defined buckets of ways it can
+ // error out.
+ if (gotErr != nil) != (test.wantErr != "") {
+ t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
+ }
+ if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
+ t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
+ }
+ if test.wantErr != "" {
+ return
+ }
+ if diff := cmp.Diff(test.wantCfg, gotCfg); diff != "" {
+ t.Fatalf("ParseConfig(%s) got unexpected output, diff (-want +got):\n%s", test.input, diff)
+ }
+ })
+ }
+}
+
+func makeEndpoints(n int) []resolver.Endpoint {
+ endpoints := make([]resolver.Endpoint, n)
+ for i := 0; i < n; i++ {
+ endpoints[i] = resolver.Endpoint{
+ Addresses: []resolver.Address{{Addr: fmt.Sprintf("endpoint-%d", i)}},
+ }
+ }
+ return endpoints
+}
+
+func (s) TestCalculateSubset_Simple(t *testing.T) {
+ tests := []struct {
+ name string
+ endpoints []resolver.Endpoint
+ subsetSize uint32
+ want []resolver.Endpoint
+ }{
+ {
+ name: "NoEndpoints",
+ endpoints: []resolver.Endpoint{},
+ subsetSize: 3,
+ want: []resolver.Endpoint{},
+ },
+ {
+ name: "SubsetSizeLargerThanNumberOfEndpoints",
+ endpoints: makeEndpoints(5),
+ subsetSize: 10,
+ want: makeEndpoints(5),
+ },
+ {
+ name: "SubsetSizeEqualToNumberOfEndpoints",
+ endpoints: makeEndpoints(5),
+ subsetSize: 5,
+ want: makeEndpoints(5),
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ b := &subsettingBalancer{
+ cfg: &lbConfig{SubsetSize: tt.subsetSize},
+ hashSeed: 0,
+ hashDigest: xxhash.New(),
+ }
+ got := b.calculateSubset(tt.endpoints)
+ if diff := cmp.Diff(tt.want, got); diff != "" {
+ t.Errorf("calculateSubset() returned diff (-want +got):\n%s", diff)
+ }
+ })
+ }
+}
+
+func (s) TestCalculateSubset_EndpointsRetainHashValues(t *testing.T) {
+ endpoints := makeEndpoints(10)
+ const subsetSize = 5
+ // The subset is deterministic based on the hash, so we can hardcode
+ // the expected output.
+ want := []resolver.Endpoint{
+ {Addresses: []resolver.Address{{Addr: "endpoint-6"}}},
+ {Addresses: []resolver.Address{{Addr: "endpoint-0"}}},
+ {Addresses: []resolver.Address{{Addr: "endpoint-1"}}},
+ {Addresses: []resolver.Address{{Addr: "endpoint-7"}}},
+ {Addresses: []resolver.Address{{Addr: "endpoint-3"}}},
+ }
+
+ b := &subsettingBalancer{
+ cfg: &lbConfig{SubsetSize: subsetSize},
+ hashSeed: 0,
+ hashDigest: xxhash.New(),
+ }
+ for range 10 {
+ got := b.calculateSubset(endpoints)
+ if diff := cmp.Diff(want, got); diff != "" {
+ t.Fatalf("calculateSubset() returned diff (-want +got):\n%s", diff)
+ }
+ }
+}
+
+func (s) TestSubsettingBalancer_DeterministicSubset(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ // Register the stub balancer builder, which will be used as the child
+ // policy in the random_subsetting balancer.
+ updateCh := make(chan balancer.ClientConnState, 1)
+ stub.Register("stub-child-balancer", stub.BalancerFuncs{
+ UpdateClientConnState: func(_ *stub.BalancerData, s balancer.ClientConnState) error {
+ select {
+ case <-ctx.Done():
+ case updateCh <- s:
+ }
+ return nil
+ },
+ })
+
+ // Create a random_subsetting balancer.
+ tcc := testutils.NewBalancerClientConn(t)
+ rsb := balancer.Get(Name).Build(tcc, balancer.BuildOptions{})
+ defer rsb.Close()
+
+ // Prepare the configuration and resolver state to be passed to the
+ // random_subsetting balancer.
+ endpoints := makeEndpoints(10)
+ state := balancer.ClientConnState{
+ ResolverState: resolver.State{Endpoints: endpoints},
+ BalancerConfig: &lbConfig{
+ SubsetSize: 5,
+ ChildPolicy: &iserviceconfig.BalancerConfig{Name: "stub-child-balancer"},
+ },
+ }
+
+ // Send the resolver state to the random_subsetting balancer and verify that
+ // the child policy receives the expected number of endpoints.
+ if err := rsb.UpdateClientConnState(state); err != nil {
+ t.Fatalf("UpdateClientConnState failed: %v", err)
+ }
+
+ var wantEndpoints []resolver.Endpoint
+ select {
+ case s := <-updateCh:
+ if len(s.ResolverState.Endpoints) != 5 {
+ t.Fatalf("Child policy received %d endpoints, want 5", len(s.ResolverState.Endpoints))
+ }
+ // Store the subset for the next comparison.
+ wantEndpoints = s.ResolverState.Endpoints
+ case <-ctx.Done():
+ t.Fatal("Timed out waiting for child policy to receive an update")
+ }
+
+ // Call UpdateClientConnState again with the same configuration.
+ if err := rsb.UpdateClientConnState(state); err != nil {
+ t.Fatalf("Second UpdateClientConnState failed: %v", err)
+ }
+
+ // Verify that the child policy receives the same subset of endpoints.
+ select {
+ case s := <-updateCh:
+ if diff := cmp.Diff(wantEndpoints, s.ResolverState.Endpoints); diff != "" {
+ t.Fatalf("Child policy received a different subset of endpoints on second update, diff (-want +got):\n%s", diff)
+ }
+ case <-ctx.Done():
+ t.Fatal("Timed out waiting for second child policy update")
+ }
+}