[grid] External datastore Redis-backed for Session Queue
Signed-off-by: Viet Nguyen Duc <[email protected]>
diff --git a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java
index 835150e..7986427 100644
--- a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java
+++ b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java
@@ -268,9 +268,7 @@ public static Distributor create(Config config) {
SessionMap sessions = new SessionMapOptions(config).getSessionMap();
SecretOptions secretOptions = new SecretOptions(config);
NewSessionQueueOptions newSessionQueueOptions = new NewSessionQueueOptions(config);
- NewSessionQueue sessionQueue =
- newSessionQueueOptions.getSessionQueue(
- "org.openqa.selenium.grid.sessionqueue.remote.RemoteNewSessionQueue");
+ NewSessionQueue sessionQueue = newSessionQueueOptions.getSessionQueue();
return new LocalDistributor(
tracer,
bus,
diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java b/java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java
index ef6e74e..3cc64c4 100644
--- a/java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java
+++ b/java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java
@@ -35,6 +35,8 @@
public class NewSessionQueueOptions {
static final String SESSION_QUEUE_SECTION = "sessionqueue";
+ static final String DEFAULT_SESSION_QUEUE =
+ "org.openqa.selenium.grid.sessionqueue.remote.RemoteNewSessionQueue";
static final int DEFAULT_MAXIMUM_RESPONSE_DELAY = 8;
static final int DEFAULT_REQUEST_TIMEOUT = 300;
static final int DEFAULT_REQUEST_TIMEOUT_PERIOD = 10;
@@ -50,6 +52,12 @@ public NewSessionQueueOptions(Config config) {
public URI getSessionQueueUri() {
+ BaseServerOptions serverOptions = new BaseServerOptions(config);
+ String scheme =
+ config
+ .get(SESSION_QUEUE_SECTION, "scheme")
+ .orElse((serverOptions.isSecure() || serverOptions.isSelfSigned()) ? "https" : "http");
+
Optional<URI> host =
config
.get(SESSION_QUEUE_SECTION, "host")
@@ -72,8 +80,6 @@ public URI getSessionQueueUri() {
return host.get();
}
- BaseServerOptions serverOptions = new BaseServerOptions(config);
- String schema = (serverOptions.isSecure() || serverOptions.isSelfSigned()) ? "https" : "http";
Optional<Integer> port = config.getInt(SESSION_QUEUE_SECTION, "port");
Optional<String> hostname = config.get(SESSION_QUEUE_SECTION, "hostname");
@@ -82,7 +88,7 @@ public URI getSessionQueueUri() {
}
try {
- return new URI(schema, null, hostname.get(), port.get(), "", null, null);
+ return new URI(scheme, null, hostname.get(), port.get(), "", null, null);
} catch (URISyntaxException e) {
throw new ConfigException(
"Session queue server uri configured through host (%s) and port (%d) is not a valid URI",
@@ -90,6 +96,87 @@ public URI getSessionQueueUri() {
}
}
+ /**
+ * Gets the Redis URI for Redis-backed session queue configuration.
+ *
+ * @return Redis URI constructed from hostname and port configuration
+ */
+ public URI getRedisUri() {
+ Optional<Integer> port = config.getInt(SESSION_QUEUE_SECTION, "port");
+ Optional<String> hostname = config.get(SESSION_QUEUE_SECTION, "hostname");
+
+ if (!(port.isPresent() && hostname.isPresent())) {
+ throw new ConfigException(
+ "Unable to determine Redis hostname and port for the session queue");
+ }
+
+ try {
+ return new URI("redis", null, hostname.get(), port.get(), "", null, null);
+ } catch (URISyntaxException e) {
+ throw new ConfigException(
+ "Redis session queue uri configured through hostname (%s) and port (%d) is not a valid"
+ + " URI",
+ hostname.get(), port.get());
+ }
+ }
+
+ /**
+ * Gets the session queue scheme (e.g., "redis", "local", "remote").
+ *
+ * @return the session queue scheme
+ */
+ public Optional<String> getSessionQueueScheme() {
+ return config.get(SESSION_QUEUE_SECTION, "scheme");
+ }
+
+ /**
+ * Gets the session queue implementation class name.
+ *
+ * @return the implementation class name
+ */
+ public Optional<String> getSessionQueueImplementation() {
+ return config.get(SESSION_QUEUE_SECTION, "implementation");
+ }
+
+ /**
+ * Gets the session queue hostname.
+ *
+ * @return the hostname
+ */
+ public Optional<String> getSessionQueueHostname() {
+ return config.get(SESSION_QUEUE_SECTION, "hostname");
+ }
+
+ /**
+ * Gets the session queue port.
+ *
+ * @return the port number
+ */
+ public Optional<Integer> getSessionQueuePort() {
+ return config.getInt(SESSION_QUEUE_SECTION, "port");
+ }
+
+ @ManagedAttribute(name = "SessionQueueScheme")
+ public String getSessionQueueSchemeAttribute() {
+ return getSessionQueueScheme().orElse("http");
+ }
+
+ @ManagedAttribute(name = "SessionQueueImplementation")
+ public String getSessionQueueImplementationAttribute() {
+ return getSessionQueueImplementation()
+ .orElse("org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueue");
+ }
+
+ @ManagedAttribute(name = "SessionQueueHostname")
+ public String getSessionQueueHostnameAttribute() {
+ return getSessionQueueHostname().orElse("localhost");
+ }
+
+ @ManagedAttribute(name = "SessionQueuePort")
+ public int getSessionQueuePortAttribute() {
+ return getSessionQueuePort().orElse(-1);
+ }
+
public Duration getMaximumResponseDelay() {
int timeout =
config
@@ -156,8 +243,8 @@ public long getRetryIntervalMilliseconds() {
return getSessionRequestRetryInterval().toMillis();
}
- public NewSessionQueue getSessionQueue(String implementation) {
+ public NewSessionQueue getSessionQueue() {
return config.getClass(
- SESSION_QUEUE_SECTION, "implementation", NewSessionQueue.class, implementation);
+ SESSION_QUEUE_SECTION, "implementation", NewSessionQueue.class, DEFAULT_SESSION_QUEUE);
}
}
diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/httpd/NewSessionQueueServer.java b/java/src/org/openqa/selenium/grid/sessionqueue/httpd/NewSessionQueueServer.java
index e02eeeb..9493b70 100644
--- a/java/src/org/openqa/selenium/grid/sessionqueue/httpd/NewSessionQueueServer.java
+++ b/java/src/org/openqa/selenium/grid/sessionqueue/httpd/NewSessionQueueServer.java
@@ -50,8 +50,6 @@
public class NewSessionQueueServer extends TemplateGridServerCommand {
private static final Logger LOG = Logger.getLogger(NewSessionQueueServer.class.getName());
- private static final String LOCAL_NEW_SESSION_QUEUE =
- "org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueue";
@Override
public String getName() {
@@ -87,7 +85,7 @@ protected Config getDefaultConfig() {
protected Handlers createHandlers(Config config) {
NewSessionQueueOptions queueOptions = new NewSessionQueueOptions(config);
- NewSessionQueue sessionQueue = queueOptions.getSessionQueue(LOCAL_NEW_SESSION_QUEUE);
+ NewSessionQueue sessionQueue = queueOptions.getSessionQueue();
return new Handlers(
Route.combine(
diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel b/java/src/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel
new file mode 100644
index 0000000..d87d585
--- /dev/null
+++ b/java/src/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel
@@ -0,0 +1,30 @@
+load("@rules_jvm_external//:defs.bzl", "artifact")
+load("//java:defs.bzl", "java_export")
+load("//java:version.bzl", "SE_VERSION")
+
+java_export(
+ name = "redis",
+ srcs = glob(["*.java"]),
+ maven_coordinates = "org.seleniumhq.selenium:selenium-session-queue-redis:%s" % SE_VERSION,
+ pom_template = "//java/src/org/openqa/selenium:template-pom",
+ tags = [
+ "release-artifact",
+ ],
+ visibility = [
+ "//visibility:public",
+ ],
+ exports = [
+ "//java/src/org/openqa/selenium/grid",
+ ],
+ deps = [
+ "//java:auto-service",
+ "//java/src/org/openqa/selenium/grid",
+ "//java/src/org/openqa/selenium/json",
+ "//java/src/org/openqa/selenium/redis",
+ "//java/src/org/openqa/selenium/remote",
+ artifact("com.beust:jcommander"),
+ artifact("com.google.guava:guava"),
+ artifact("io.lettuce:lettuce-core"),
+ artifact("org.redisson:redisson"),
+ ],
+)
diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueue.java b/java/src/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueue.java
new file mode 100644
index 0000000..2be0fa8
--- /dev/null
+++ b/java/src/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueue.java
@@ -0,0 +1,537 @@
+// Licensed to the Software Freedom Conservancy (SFC) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The SFC licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.openqa.selenium.grid.sessionqueue.redis;
+
+import static org.openqa.selenium.remote.tracing.Tags.EXCEPTION;
+
+import com.google.common.collect.ImmutableMap;
+import io.lettuce.core.KeyValue;
+import java.io.Closeable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.openqa.selenium.Capabilities;
+import org.openqa.selenium.SessionNotCreatedException;
+import org.openqa.selenium.grid.config.Config;
+import org.openqa.selenium.grid.data.CreateSessionResponse;
+import org.openqa.selenium.grid.data.RequestId;
+import org.openqa.selenium.grid.data.SessionRequest;
+import org.openqa.selenium.grid.data.SessionRequestCapability;
+import org.openqa.selenium.grid.jmx.JMXHelper;
+import org.openqa.selenium.grid.jmx.ManagedAttribute;
+import org.openqa.selenium.grid.jmx.ManagedService;
+import org.openqa.selenium.grid.log.LoggingOptions;
+import org.openqa.selenium.grid.security.Secret;
+import org.openqa.selenium.grid.security.SecretOptions;
+import org.openqa.selenium.grid.sessionqueue.NewSessionQueue;
+import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions;
+import org.openqa.selenium.internal.Either;
+import org.openqa.selenium.internal.Require;
+import org.openqa.selenium.json.Json;
+import org.openqa.selenium.redis.GridRedisClient;
+import org.openqa.selenium.remote.http.HttpResponse;
+import org.openqa.selenium.remote.tracing.AttributeKey;
+import org.openqa.selenium.remote.tracing.AttributeMap;
+import org.openqa.selenium.remote.tracing.Span;
+import org.openqa.selenium.remote.tracing.Status;
+import org.openqa.selenium.remote.tracing.Tracer;
+
+/**
+ * A Redis-backed implementation of {@link NewSessionQueue} that stores session requests in Redis
+ * for distributed session queue management across multiple Grid instances.
+ */
+@ManagedService(
+ objectName = "org.seleniumhq.grid:type=SessionQueue,name=RedisBackedSessionQueue",
+ description = "Redis backed session queue")
+public class RedisBackedSessionQueue extends NewSessionQueue implements Closeable {
+
+ private static final Logger LOG = Logger.getLogger(RedisBackedSessionQueue.class.getName());
+ private static final Json JSON = new Json();
+ private static final String QUEUE_KEY = "session:queue";
+ private static final String REQUEST_KEY_PREFIX = "session:request:";
+ private static final String ENQUEUE_TIME_KEY_PREFIX = "session:enqueue_time:";
+
+ // Redis operation attribute keys
+ private static final String REDIS_OPERATION = "redis.operation";
+ private static final String REDIS_KEY = "redis.key";
+ private static final String REDIS_VALUE = "redis.value";
+ private static final String REDIS_URI = "redis.uri";
+
+ private final GridRedisClient connection;
+ private final URI redisUri;
+
+ public RedisBackedSessionQueue(Tracer tracer, Secret registrationSecret, URI redisUri) {
+ super(tracer, registrationSecret);
+ this.redisUri = Require.nonNull("Redis URI", redisUri);
+ this.connection = createRedisClient(redisUri);
+
+ new JMXHelper().register(this);
+ }
+
+ /**
+ * Protected constructor for testing that allows dependency injection.
+ *
+ * @param tracer the tracer for observability
+ * @param registrationSecret the registration secret
+ * @param redisUri the Redis URI
+ * @param redisClient the Redis client to use (for testing)
+ */
+ protected RedisBackedSessionQueue(
+ Tracer tracer, Secret registrationSecret, URI redisUri, GridRedisClient redisClient) {
+ super(tracer, registrationSecret);
+ this.redisUri = Require.nonNull("Redis URI", redisUri);
+ this.connection = redisClient;
+
+ new JMXHelper().register(this);
+ }
+
+ /**
+ * Protected constructor for testing that allows dependency injection and JMX registration
+ * control.
+ *
+ * @param tracer the tracer for observability
+ * @param registrationSecret the registration secret
+ * @param redisUri the Redis URI
+ * @param redisClient the Redis client to use (for testing)
+ * @param skipJmxRegistration whether to skip JMX registration (for testing)
+ */
+ protected RedisBackedSessionQueue(
+ Tracer tracer,
+ Secret registrationSecret,
+ URI redisUri,
+ GridRedisClient redisClient,
+ boolean skipJmxRegistration) {
+ super(tracer, registrationSecret);
+ this.redisUri = Require.nonNull("Redis URI", redisUri);
+ this.connection = redisClient;
+
+ if (!skipJmxRegistration) {
+ new JMXHelper().register(this);
+ }
+ }
+
+ /**
+ * Creates a new GridRedisClient. This method can be overridden in tests to provide a mock Redis
+ * client.
+ *
+ * @param redisUri the Redis URI
+ * @return the GridRedisClient instance
+ */
+ protected GridRedisClient createRedisClient(URI redisUri) {
+ return new GridRedisClient(redisUri);
+ }
+
+ /**
+ * Creates a new RedisBackedSessionQueue from configuration. This is the factory method used by
+ * Selenium Grid's configuration system.
+ *
+ * @param config the configuration object
+ * @return a new RedisBackedSessionQueue instance
+ */
+ public static NewSessionQueue create(Config config) {
+ Tracer tracer = new LoggingOptions(config).getTracer();
+ Secret secret = new SecretOptions(config).getRegistrationSecret();
+ NewSessionQueueOptions queueOptions = new NewSessionQueueOptions(config);
+
+ // For RedisBackedSessionQueue, always construct a Redis URI from hostname/port
+ // This ensures we get redis:// scheme instead of http:// scheme
+ URI redisUri = queueOptions.getRedisUri();
+
+ return new RedisBackedSessionQueue(tracer, secret, redisUri);
+ }
+
+ @Override
+ public boolean isReady() {
+ return getRedisClient().isOpen();
+ }
+
+ /**
+ * Gets the Redis client connection. This method can be overridden in tests to provide a mock
+ * Redis client.
+ *
+ * @return the GridRedisClient instance
+ */
+ protected GridRedisClient getRedisClient() {
+ return connection;
+ }
+
+ @Override
+ public boolean peekEmpty() {
+ try (Span span = tracer.getCurrentContext().createSpan("LLEN " + QUEUE_KEY)) {
+ AttributeMap attributeMap = tracer.createAttributeMap();
+ setCommonSpanAttributes(span);
+ setCommonEventAttributes(attributeMap);
+
+ span.setAttribute(REDIS_OPERATION, "LLEN");
+ span.setAttribute(REDIS_KEY, QUEUE_KEY);
+ attributeMap.put(REDIS_OPERATION, "LLEN");
+ attributeMap.put(REDIS_KEY, QUEUE_KEY);
+
+ try {
+ Long queueLength = getRedisClient().getConnection().sync().llen(QUEUE_KEY);
+ boolean isEmpty = queueLength == 0;
+
+ attributeMap.put("queue.empty", isEmpty);
+ attributeMap.put("queue.length", queueLength);
+ span.addEvent("Checked queue emptiness", attributeMap);
+
+ return isEmpty;
+ } catch (Exception e) {
+ span.setAttribute("error", true);
+ span.setStatus(Status.CANCELLED);
+ EXCEPTION.accept(attributeMap, e);
+ attributeMap.put(
+ AttributeKey.EXCEPTION_MESSAGE.getKey(),
+ "Unable to check if queue is empty: " + e.getMessage());
+ span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap);
+ LOG.log(Level.SEVERE, "Failed to check if queue is empty", e);
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public HttpResponse addToQueue(SessionRequest request) {
+ Require.nonNull("SessionRequest to add", request);
+
+ try (Span span =
+ tracer.getCurrentContext().createSpan("LPUSH " + QUEUE_KEY + " and MSET request data")) {
+ AttributeMap attributeMap = tracer.createAttributeMap();
+ setCommonSpanAttributes(span);
+ setCommonEventAttributes(attributeMap);
+
+ String requestId = request.getRequestId().toString();
+ String requestKey = REQUEST_KEY_PREFIX + requestId;
+ String enqueueTimeKey = ENQUEUE_TIME_KEY_PREFIX + requestId;
+ String requestJson = JSON.toJson(request);
+ String enqueueTime = request.getEnqueued().toString();
+
+ span.setAttribute(REDIS_OPERATION, "LPUSH+MSET");
+ span.setAttribute(REDIS_KEY, QUEUE_KEY);
+ span.setAttribute(REDIS_VALUE, requestId);
+ attributeMap.put(REDIS_OPERATION, "LPUSH+MSET");
+ attributeMap.put(REDIS_KEY, QUEUE_KEY);
+ attributeMap.put(REDIS_VALUE, requestId);
+ attributeMap.put("request.id", requestId);
+
+ try {
+ // Store request data and enqueue time
+ getRedisClient()
+ .mset(
+ ImmutableMap.of(
+ requestKey, requestJson,
+ enqueueTimeKey, enqueueTime));
+
+ // Add request ID to the queue
+ Long queueLength = getRedisClient().getConnection().sync().lpush(QUEUE_KEY, requestId);
+
+ attributeMap.put("queue.length", queueLength);
+ span.addEvent("Added request to queue", attributeMap);
+
+ HttpResponse resp = new HttpResponse();
+ resp.setStatus(200);
+ return resp;
+ } catch (Exception e) {
+ span.setAttribute("error", true);
+ span.setStatus(Status.CANCELLED);
+ EXCEPTION.accept(attributeMap, e);
+ attributeMap.put(
+ AttributeKey.EXCEPTION_MESSAGE.getKey(),
+ "Unable to add session request to Redis: " + e.getMessage());
+ span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap);
+ LOG.log(Level.SEVERE, "Failed to add session request to queue", e);
+
+ HttpResponse resp = new HttpResponse();
+ resp.setStatus(500);
+ return resp;
+ }
+ }
+ }
+
+ @Override
+ public boolean retryAddToQueue(SessionRequest request) {
+ HttpResponse response = addToQueue(request);
+ return response.getStatus() == 200;
+ }
+
+ @Override
+ public Optional<SessionRequest> remove(RequestId requestId) {
+ Require.nonNull("RequestId to remove", requestId);
+
+ try (Span span = tracer.getCurrentContext().createSpan("LREM and GET request data")) {
+ AttributeMap attributeMap = tracer.createAttributeMap();
+ setCommonSpanAttributes(span);
+ setCommonEventAttributes(attributeMap);
+
+ String requestIdStr = requestId.toString();
+ String requestKey = REQUEST_KEY_PREFIX + requestIdStr;
+ String enqueueTimeKey = ENQUEUE_TIME_KEY_PREFIX + requestIdStr;
+
+ span.setAttribute(REDIS_OPERATION, "GET+LREM+DEL");
+ span.setAttribute(REDIS_KEY, requestKey);
+ attributeMap.put(REDIS_OPERATION, "GET+LREM+DEL");
+ attributeMap.put(REDIS_KEY, requestKey);
+ attributeMap.put("request.id", requestIdStr);
+
+ try {
+ // Get the request data first
+ String requestJson = getRedisClient().get(requestKey);
+
+ if (requestJson != null) {
+ // Remove from queue and delete associated data
+ Long removedCount =
+ getRedisClient().getConnection().sync().lrem(QUEUE_KEY, 1, requestIdStr);
+ getRedisClient().del(requestKey, enqueueTimeKey);
+
+ attributeMap.put("removed.count", removedCount);
+ span.addEvent("Removed request from queue", attributeMap);
+
+ return Optional.of(JSON.toType(requestJson, SessionRequest.class));
+ } else {
+ attributeMap.put("request.found", false);
+ span.addEvent("Session request not found in queue", attributeMap);
+ }
+ } catch (Exception e) {
+ span.setAttribute("error", true);
+ span.setStatus(Status.CANCELLED);
+ EXCEPTION.accept(attributeMap, e);
+ attributeMap.put(
+ AttributeKey.EXCEPTION_MESSAGE.getKey(),
+ "Unable to remove session request from queue: " + e.getMessage());
+ span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap);
+ LOG.log(Level.SEVERE, "Failed to remove session request from queue", e);
+ }
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes) {
+ try (Span span = tracer.getCurrentContext().createSpan("RPOP and GET next available request")) {
+ AttributeMap attributeMap = tracer.createAttributeMap();
+ setCommonSpanAttributes(span);
+ setCommonEventAttributes(attributeMap);
+
+ span.setAttribute(REDIS_OPERATION, "RPOP+GET");
+ span.setAttribute(REDIS_KEY, QUEUE_KEY);
+ attributeMap.put(REDIS_OPERATION, "RPOP+GET");
+ attributeMap.put(REDIS_KEY, QUEUE_KEY);
+
+ try {
+ // Get the next request ID from the queue (FIFO - right pop from left push)
+ String requestIdStr = getRedisClient().getConnection().sync().rpop(QUEUE_KEY);
+
+ if (requestIdStr != null) {
+ String requestKey = REQUEST_KEY_PREFIX + requestIdStr;
+ String requestJson = getRedisClient().get(requestKey);
+
+ if (requestJson != null) {
+ SessionRequest request = JSON.toType(requestJson, SessionRequest.class);
+ attributeMap.put("requests.found", 1);
+ attributeMap.put("request.id", requestIdStr);
+ span.addEvent("Retrieved next available session request", attributeMap);
+ return List.of(request);
+ } else {
+ // Request data is missing, log warning but continue
+ LOG.log(Level.WARNING, "Request data missing for ID: " + requestIdStr);
+ }
+ }
+
+ attributeMap.put("requests.found", 0);
+ span.addEvent("No session requests available", attributeMap);
+ } catch (Exception e) {
+ span.setAttribute("error", true);
+ span.setStatus(Status.CANCELLED);
+ EXCEPTION.accept(attributeMap, e);
+ attributeMap.put(
+ AttributeKey.EXCEPTION_MESSAGE.getKey(),
+ "Unable to get next available session request: " + e.getMessage());
+ span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap);
+ LOG.log(Level.SEVERE, "Failed to get next available session request", e);
+ }
+ }
+ return List.of();
+ }
+
+ @Override
+ public boolean complete(
+ RequestId reqId, Either<SessionNotCreatedException, CreateSessionResponse> result) {
+ // For Redis implementation, we just need to remove the request from storage
+ // The request was already removed from the queue in getNextAvailable()
+ String requestIdStr = reqId.toString();
+ String requestKey = REQUEST_KEY_PREFIX + requestIdStr;
+ String enqueueTimeKey = ENQUEUE_TIME_KEY_PREFIX + requestIdStr;
+
+ try {
+ getRedisClient().del(requestKey, enqueueTimeKey);
+ return true;
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to clean up completed request: " + requestIdStr, e);
+ return false;
+ }
+ }
+
+ @Override
+ public int clearQueue() {
+ try (Span span =
+ tracer.getCurrentContext().createSpan("Clear all session requests from queue")) {
+ AttributeMap attributeMap = tracer.createAttributeMap();
+ setCommonSpanAttributes(span);
+ setCommonEventAttributes(attributeMap);
+
+ span.setAttribute(REDIS_OPERATION, "LRANGE+DEL");
+ span.setAttribute(REDIS_KEY, QUEUE_KEY);
+ attributeMap.put(REDIS_OPERATION, "LRANGE+DEL");
+ attributeMap.put(REDIS_KEY, QUEUE_KEY);
+
+ try {
+ // Get all request IDs from the queue
+ List<String> requestIds = getRedisClient().getConnection().sync().lrange(QUEUE_KEY, 0, -1);
+ int requestCount = requestIds.size();
+
+ if (requestCount > 0) {
+ // Delete all request data
+ List<String> keysToDelete = new ArrayList<>();
+ for (String requestId : requestIds) {
+ keysToDelete.add(REQUEST_KEY_PREFIX + requestId);
+ keysToDelete.add(ENQUEUE_TIME_KEY_PREFIX + requestId);
+ }
+
+ // Delete the queue and all request data
+ keysToDelete.add(QUEUE_KEY);
+ getRedisClient().del(keysToDelete.toArray(new String[0]));
+ }
+
+ attributeMap.put("requests.cleared", requestCount);
+ span.addEvent("Cleared all session requests from queue", attributeMap);
+ return requestCount;
+ } catch (Exception e) {
+ span.setAttribute("error", true);
+ span.setStatus(Status.CANCELLED);
+ EXCEPTION.accept(attributeMap, e);
+ attributeMap.put(
+ AttributeKey.EXCEPTION_MESSAGE.getKey(),
+ "Unable to clear session queue: " + e.getMessage());
+ span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap);
+ LOG.log(Level.SEVERE, "Failed to clear session queue", e);
+ return 0;
+ }
+ }
+ }
+
+ @Override
+ public List<SessionRequestCapability> getQueueContents() {
+ try (Span span = tracer.getCurrentContext().createSpan("Get all session requests from queue")) {
+ AttributeMap attributeMap = tracer.createAttributeMap();
+ setCommonSpanAttributes(span);
+ setCommonEventAttributes(attributeMap);
+
+ span.setAttribute(REDIS_OPERATION, "LRANGE+MGET");
+ span.setAttribute(REDIS_KEY, QUEUE_KEY);
+ attributeMap.put(REDIS_OPERATION, "LRANGE+MGET");
+ attributeMap.put(REDIS_KEY, QUEUE_KEY);
+
+ try {
+ // Get all request IDs from the queue (in order)
+ List<String> requestIds = getRedisClient().getConnection().sync().lrange(QUEUE_KEY, 0, -1);
+ List<SessionRequestCapability> contents = new ArrayList<>();
+
+ if (!requestIds.isEmpty()) {
+ // Get all request data in batch
+ String[] requestKeys =
+ requestIds.stream().map(id -> REQUEST_KEY_PREFIX + id).toArray(String[]::new);
+
+ List<KeyValue<String, String>> requestData = getRedisClient().mget(requestKeys);
+
+ for (int i = 0; i < requestIds.size(); i++) {
+ String requestIdStr = requestIds.get(i);
+ KeyValue<String, String> keyValue = requestData.get(i);
+
+ if (keyValue != null && keyValue.hasValue()) {
+ try {
+ RequestId requestId = new RequestId(UUID.fromString(requestIdStr));
+ SessionRequest request = JSON.toType(keyValue.getValue(), SessionRequest.class);
+
+ SessionRequestCapability capability =
+ new SessionRequestCapability(requestId, request.getDesiredCapabilities());
+ contents.add(capability);
+ } catch (Exception e) {
+ LOG.log(
+ Level.WARNING,
+ "Failed to parse session request from queue: " + requestIdStr,
+ e);
+ }
+ }
+ }
+ }
+
+ attributeMap.put("queue.contents.size", contents.size());
+ span.addEvent("Retrieved queue contents", attributeMap);
+ return contents;
+ } catch (Exception e) {
+ span.setAttribute("error", true);
+ span.setStatus(Status.CANCELLED);
+ EXCEPTION.accept(attributeMap, e);
+ attributeMap.put(
+ AttributeKey.EXCEPTION_MESSAGE.getKey(),
+ "Unable to get queue contents: " + e.getMessage());
+ span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap);
+ LOG.log(Level.SEVERE, "Failed to get queue contents", e);
+ }
+ }
+ return List.of();
+ }
+
+ @Override
+ public void close() {
+ try {
+ getRedisClient().close();
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to close Redis connection for SessionQueue", e);
+ }
+ }
+
+ @ManagedAttribute(name = "RedisUri")
+ public String getRedisUri() {
+ return redisUri.toString();
+ }
+
+ @ManagedAttribute(name = "QueueSize")
+ public long getQueueSize() {
+ try {
+ return getRedisClient().getConnection().sync().llen(QUEUE_KEY);
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to get queue size", e);
+ return -1;
+ }
+ }
+
+ private void setCommonSpanAttributes(Span span) {
+ span.setAttribute("span.kind", Span.Kind.CLIENT.toString());
+ span.setAttribute(REDIS_URI, redisUri.toString());
+ }
+
+ private void setCommonEventAttributes(AttributeMap attributeMap) {
+ attributeMap.put(REDIS_URI, redisUri.toString());
+ }
+}
diff --git a/java/test/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel b/java/test/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel
new file mode 100644
index 0000000..10adbef
--- /dev/null
+++ b/java/test/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel
@@ -0,0 +1,22 @@
+load("@rules_jvm_external//:defs.bzl", "artifact")
+load("//java:defs.bzl", "JUNIT5_DEPS", "java_test_suite")
+
+java_test_suite(
+ name = "MediumTests",
+ size = "medium",
+ srcs = glob(["*Test.java"]),
+ deps = [
+ "//java/src/org/openqa/selenium/events/local",
+ "//java/src/org/openqa/selenium/grid/sessionqueue/redis",
+ "//java/src/org/openqa/selenium/json",
+ "//java/src/org/openqa/selenium/redis",
+ "//java/src/org/openqa/selenium/remote",
+ "//java/test/org/openqa/selenium/remote/tracing:tracing-support",
+ "//java/test/org/openqa/selenium/testing:test-base",
+ artifact("io.lettuce:lettuce-core"),
+ artifact("io.opentelemetry:opentelemetry-api"),
+ artifact("org.junit.jupiter:junit-jupiter-api"),
+ artifact("org.assertj:assertj-core"),
+ artifact("org.mockito:mockito-core"),
+ ] + JUNIT5_DEPS,
+)
diff --git a/java/test/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueueTest.java b/java/test/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueueTest.java
new file mode 100644
index 0000000..97abe20
--- /dev/null
+++ b/java/test/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueueTest.java
@@ -0,0 +1,494 @@
+// Licensed to the Software Freedom Conservancy (SFC) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The SFC licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.openqa.selenium.grid.sessionqueue.redis;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+import static org.openqa.selenium.remote.http.HttpMethod.POST;
+
+import io.lettuce.core.KeyValue;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import java.net.URI;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.openqa.selenium.Capabilities;
+import org.openqa.selenium.SessionNotCreatedException;
+import org.openqa.selenium.grid.data.CreateSessionResponse;
+import org.openqa.selenium.grid.data.RequestId;
+import org.openqa.selenium.grid.data.SessionRequest;
+import org.openqa.selenium.grid.data.SessionRequestCapability;
+import org.openqa.selenium.grid.jmx.ManagedService;
+import org.openqa.selenium.grid.security.Secret;
+import org.openqa.selenium.internal.Either;
+import org.openqa.selenium.json.Json;
+import org.openqa.selenium.redis.GridRedisClient;
+import org.openqa.selenium.remote.http.Contents;
+import org.openqa.selenium.remote.http.HttpRequest;
+import org.openqa.selenium.remote.http.HttpResponse;
+import org.openqa.selenium.remote.tracing.DefaultTestTracer;
+import org.openqa.selenium.remote.tracing.Tracer;
+
+class RedisBackedSessionQueueTest {
+
+ private static final Tracer tracer = DefaultTestTracer.createTracer();
+ private static final Secret secret = new Secret("test-secret");
+ private static final URI redisUri = URI.create("redis://localhost:6379");
+ private static final Json JSON = new Json();
+
+ @Mock private GridRedisClient mockRedisClient;
+ @Mock private StatefulRedisConnection<String, String> mockConnection;
+ @Mock private RedisCommands<String, String> mockCommands;
+
+ private TestableRedisBackedSessionQueue queue;
+
+ @BeforeEach
+ void setUp() {
+ MockitoAnnotations.openMocks(this);
+ when(mockRedisClient.getConnection()).thenReturn(mockConnection);
+ when(mockConnection.sync()).thenReturn(mockCommands);
+ when(mockRedisClient.isOpen()).thenReturn(true);
+
+ // Create queue with mocked Redis client and skip JMX registration
+ queue = new TestableRedisBackedSessionQueue(tracer, secret, redisUri, mockRedisClient, true);
+ }
+
+ // Test-specific subclass that accepts a mock Redis client
+ @ManagedService
+ private static class TestableRedisBackedSessionQueue extends RedisBackedSessionQueue {
+ public TestableRedisBackedSessionQueue(
+ Tracer tracer, Secret registrationSecret, URI redisUri, GridRedisClient redisClient) {
+ super(tracer, registrationSecret, redisUri, redisClient);
+ }
+
+ public TestableRedisBackedSessionQueue(
+ Tracer tracer,
+ Secret registrationSecret,
+ URI redisUri,
+ GridRedisClient redisClient,
+ boolean skipJmxRegistration) {
+ super(tracer, registrationSecret, redisUri, redisClient, skipJmxRegistration);
+ }
+ }
+
+ @Test
+ void shouldThrowIllegalArgumentExceptionIfRedisUriIsNull() {
+ assertThatThrownBy(() -> new RedisBackedSessionQueue(tracer, secret, null))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowIllegalArgumentExceptionIfTracerIsNull() {
+ assertThatThrownBy(() -> new RedisBackedSessionQueue(null, secret, redisUri))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowIllegalArgumentExceptionIfSecretIsNull() {
+ assertThatThrownBy(() -> new RedisBackedSessionQueue(tracer, null, redisUri))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void isReadyShouldReturnTrueWhenRedisConnectionIsOpen() {
+ when(mockRedisClient.isOpen()).thenReturn(true);
+ assertThat(queue.isReady()).isTrue();
+ }
+
+ @Test
+ void isReadyShouldReturnFalseWhenRedisConnectionIsClosed() {
+ when(mockRedisClient.isOpen()).thenReturn(false);
+ assertThat(queue.isReady()).isFalse();
+ }
+
+ @Test
+ void peekEmptyShouldReturnTrueWhenQueueIsEmpty() {
+ when(mockCommands.llen("session:queue")).thenReturn(0L);
+ assertThat(queue.peekEmpty()).isTrue();
+ }
+
+ @Test
+ void peekEmptyShouldReturnFalseWhenQueueHasRequests() {
+ when(mockCommands.llen("session:queue")).thenReturn(2L);
+ assertThat(queue.peekEmpty()).isFalse();
+ }
+
+ @Test
+ void peekEmptyShouldReturnFalseOnRedisException() {
+ when(mockCommands.llen("session:queue"))
+ .thenThrow(new RedisCommandExecutionException("Redis error"));
+ assertThat(queue.peekEmpty()).isFalse();
+ }
+
+ @Test
+ void canAddSessionRequestToQueue() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+ SessionRequest request = createSessionRequest(requestId);
+
+ when(mockCommands.lpush("session:queue", requestId.toString())).thenReturn(1L);
+
+ HttpResponse response = queue.addToQueue(request);
+
+ assertThat(response.getStatus()).isEqualTo(200);
+
+ // Verify Redis operations
+ verify(mockRedisClient)
+ .mset(
+ argThat(
+ map ->
+ map.containsKey("session:request:" + requestId.toString())
+ && map.containsKey("session:enqueue_time:" + requestId.toString())));
+ verify(mockCommands).lpush("session:queue", requestId.toString());
+ }
+
+ @Test
+ void addToQueueShouldReturn500OnRedisException() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+ SessionRequest request = createSessionRequest(requestId);
+
+ doThrow(new RedisCommandExecutionException("Redis error")).when(mockRedisClient).mset(any());
+
+ HttpResponse response = queue.addToQueue(request);
+
+ assertThat(response.getStatus()).isEqualTo(500);
+ }
+
+ @Test
+ void retryAddToQueueShouldReturnTrueOnSuccess() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+ SessionRequest request = createSessionRequest(requestId);
+
+ when(mockCommands.lpush("session:queue", requestId.toString())).thenReturn(1L);
+
+ boolean result = queue.retryAddToQueue(request);
+
+ assertThat(result).isTrue();
+ }
+
+ @Test
+ void retryAddToQueueShouldReturnFalseOnFailure() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+ SessionRequest request = createSessionRequest(requestId);
+
+ doThrow(new RedisCommandExecutionException("Redis error")).when(mockRedisClient).mset(any());
+
+ boolean result = queue.retryAddToQueue(request);
+
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ void canRemoveSessionRequestFromQueue() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+ SessionRequest originalRequest = createSessionRequest(requestId);
+ String requestJson = JSON.toJson(originalRequest);
+
+ when(mockRedisClient.get("session:request:" + requestId.toString())).thenReturn(requestJson);
+ when(mockCommands.lrem("session:queue", 1, requestId.toString())).thenReturn(1L);
+
+ Optional<SessionRequest> removed = queue.remove(requestId);
+
+ assertThat(removed).isPresent();
+ assertThat(removed.get().getRequestId()).isEqualTo(requestId);
+
+ // Verify Redis operations
+ verify(mockRedisClient).get("session:request:" + requestId.toString());
+ verify(mockCommands).lrem("session:queue", 1, requestId.toString());
+ verify(mockRedisClient)
+ .del(
+ "session:request:" + requestId.toString(),
+ "session:enqueue_time:" + requestId.toString());
+ }
+
+ @Test
+ void removeShouldReturnEmptyWhenRequestNotFound() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+
+ when(mockRedisClient.get("session:request:" + requestId.toString())).thenReturn(null);
+
+ Optional<SessionRequest> removed = queue.remove(requestId);
+
+ assertThat(removed).isEmpty();
+ }
+
+ @Test
+ void removeShouldReturnEmptyOnRedisException() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+
+ when(mockRedisClient.get("session:request:" + requestId.toString()))
+ .thenThrow(new RedisCommandExecutionException("Redis error"));
+
+ Optional<SessionRequest> removed = queue.remove(requestId);
+
+ assertThat(removed).isEmpty();
+ }
+
+ @Test
+ void getNextAvailableShouldReturnOldestRequest() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+ SessionRequest originalRequest = createSessionRequest(requestId);
+ String requestJson = JSON.toJson(originalRequest);
+
+ when(mockCommands.rpop("session:queue")).thenReturn(requestId.toString());
+ when(mockRedisClient.get("session:request:" + requestId.toString())).thenReturn(requestJson);
+
+ List<SessionRequest> next = queue.getNextAvailable(Map.of());
+
+ assertThat(next).hasSize(1);
+ assertThat(next.get(0).getRequestId()).isEqualTo(requestId);
+
+ // Verify Redis operations
+ verify(mockCommands).rpop("session:queue");
+ verify(mockRedisClient).get("session:request:" + requestId.toString());
+ }
+
+ @Test
+ void getNextAvailableShouldReturnEmptyWhenQueueIsEmpty() {
+ when(mockCommands.rpop("session:queue")).thenReturn(null);
+
+ List<SessionRequest> next = queue.getNextAvailable(Map.of());
+
+ assertThat(next).isEmpty();
+ }
+
+ @Test
+ void getNextAvailableShouldReturnEmptyWhenRequestDataIsMissing() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+
+ when(mockCommands.rpop("session:queue")).thenReturn(requestId.toString());
+ when(mockRedisClient.get("session:request:" + requestId.toString())).thenReturn(null);
+
+ List<SessionRequest> next = queue.getNextAvailable(Map.of());
+
+ assertThat(next).isEmpty();
+ }
+
+ @Test
+ void getNextAvailableShouldReturnEmptyOnRedisException() {
+ when(mockCommands.rpop("session:queue"))
+ .thenThrow(new RedisCommandExecutionException("Redis error"));
+
+ List<SessionRequest> next = queue.getNextAvailable(Map.of());
+
+ assertThat(next).isEmpty();
+ }
+
+ @Test
+ void completeShouldReturnTrueAndCleanupRequestData() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+ CreateSessionResponse response = mock(CreateSessionResponse.class);
+ Either<SessionNotCreatedException, CreateSessionResponse> result = Either.right(response);
+
+ boolean completed = queue.complete(requestId, result);
+
+ assertThat(completed).isTrue();
+
+ // Verify cleanup operations
+ verify(mockRedisClient)
+ .del(
+ "session:request:" + requestId.toString(),
+ "session:enqueue_time:" + requestId.toString());
+ }
+
+ @Test
+ void completeShouldReturnFalseOnRedisException() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+ CreateSessionResponse response = mock(CreateSessionResponse.class);
+ Either<SessionNotCreatedException, CreateSessionResponse> result = Either.right(response);
+
+ doThrow(new RedisCommandExecutionException("Redis error"))
+ .when(mockRedisClient)
+ .del(anyString(), anyString());
+
+ boolean completed = queue.complete(requestId, result);
+
+ assertThat(completed).isFalse();
+ }
+
+ @Test
+ void clearQueueShouldRemoveAllRequests() {
+ RequestId requestId1 = new RequestId(UUID.randomUUID());
+ RequestId requestId2 = new RequestId(UUID.randomUUID());
+
+ when(mockCommands.lrange("session:queue", 0, -1))
+ .thenReturn(List.of(requestId1.toString(), requestId2.toString()));
+
+ int cleared = queue.clearQueue();
+
+ assertThat(cleared).isEqualTo(2);
+
+ // Verify Redis operations
+ verify(mockCommands).lrange("session:queue", 0, -1);
+ verify(mockRedisClient)
+ .del(
+ "session:request:" + requestId1.toString(),
+ "session:enqueue_time:" + requestId1.toString(),
+ "session:request:" + requestId2.toString(),
+ "session:enqueue_time:" + requestId2.toString(),
+ "session:queue");
+ }
+
+ @Test
+ void clearQueueShouldReturn0WhenQueueIsEmpty() {
+ when(mockCommands.lrange("session:queue", 0, -1)).thenReturn(List.of());
+
+ int cleared = queue.clearQueue();
+
+ assertThat(cleared).isEqualTo(0);
+ }
+
+ @Test
+ void clearQueueShouldReturn0OnRedisException() {
+ when(mockCommands.lrange("session:queue", 0, -1))
+ .thenThrow(new RedisCommandExecutionException("Redis error"));
+
+ int cleared = queue.clearQueue();
+
+ assertThat(cleared).isEqualTo(0);
+ }
+
+ @Test
+ void getQueueContentsShouldReturnAllRequests() {
+ RequestId requestId1 = new RequestId(UUID.randomUUID());
+ RequestId requestId2 = new RequestId(UUID.randomUUID());
+ SessionRequest request1 = createSessionRequest(requestId1);
+ SessionRequest request2 = createSessionRequest(requestId2);
+
+ when(mockCommands.lrange("session:queue", 0, -1))
+ .thenReturn(List.of(requestId1.toString(), requestId2.toString()));
+
+ when(mockRedisClient.mget(
+ "session:request:" + requestId1.toString(), "session:request:" + requestId2.toString()))
+ .thenReturn(
+ List.of(
+ KeyValue.just("session:request:" + requestId1.toString(), JSON.toJson(request1)),
+ KeyValue.just("session:request:" + requestId2.toString(), JSON.toJson(request2))));
+
+ List<SessionRequestCapability> contents = queue.getQueueContents();
+
+ assertThat(contents).hasSize(2);
+ assertThat(contents.get(0).getRequestId()).isEqualTo(requestId1);
+ assertThat(contents.get(1).getRequestId()).isEqualTo(requestId2);
+ }
+
+ @Test
+ void getQueueContentsShouldReturnEmptyWhenQueueIsEmpty() {
+ when(mockCommands.lrange("session:queue", 0, -1)).thenReturn(List.of());
+
+ List<SessionRequestCapability> contents = queue.getQueueContents();
+
+ assertThat(contents).isEmpty();
+ }
+
+ @Test
+ void getQueueContentsShouldReturnEmptyOnRedisException() {
+ when(mockCommands.lrange("session:queue", 0, -1))
+ .thenThrow(new RedisCommandExecutionException("Redis error"));
+
+ List<SessionRequestCapability> contents = queue.getQueueContents();
+
+ assertThat(contents).isEmpty();
+ }
+
+ @Test
+ void getQueueContentsShouldHandleMissingRequestData() {
+ RequestId requestId1 = new RequestId(UUID.randomUUID());
+ RequestId requestId2 = new RequestId(UUID.randomUUID());
+ SessionRequest request1 = createSessionRequest(requestId1);
+
+ when(mockCommands.lrange("session:queue", 0, -1))
+ .thenReturn(List.of(requestId1.toString(), requestId2.toString()));
+
+ when(mockRedisClient.mget(
+ "session:request:" + requestId1.toString(), "session:request:" + requestId2.toString()))
+ .thenReturn(
+ List.of(
+ KeyValue.just("session:request:" + requestId1.toString(), JSON.toJson(request1)),
+ KeyValue.empty("session:request:" + requestId2.toString()) // Missing data
+ ));
+
+ List<SessionRequestCapability> contents = queue.getQueueContents();
+
+ assertThat(contents).hasSize(1);
+ assertThat(contents.get(0).getRequestId()).isEqualTo(requestId1);
+ }
+
+ @Test
+ void closeShouldCloseRedisConnection() {
+ queue.close();
+
+ verify(mockRedisClient).close();
+ }
+
+ @Test
+ void closeShouldHandleRedisException() {
+ doThrow(new RuntimeException("Close error")).when(mockRedisClient).close();
+
+ // Should not throw exception
+ assertThatCode(() -> queue.close()).doesNotThrowAnyException();
+ }
+
+ @Test
+ void getQueueSizeShouldReturnCorrectSize() {
+ when(mockCommands.llen("session:queue")).thenReturn(5L);
+
+ long size = queue.getQueueSize();
+
+ assertThat(size).isEqualTo(5L);
+ }
+
+ @Test
+ void getQueueSizeShouldReturnNegativeOneOnException() {
+ when(mockCommands.llen("session:queue"))
+ .thenThrow(new RedisCommandExecutionException("Redis error"));
+
+ long size = queue.getQueueSize();
+
+ assertThat(size).isEqualTo(-1L);
+ }
+
+ @Test
+ void getRedisUriShouldReturnConfiguredUri() {
+ String uri = queue.getRedisUri();
+
+ assertThat(uri).isEqualTo(redisUri.toString());
+ }
+
+ private SessionRequest createSessionRequest(RequestId requestId) {
+ HttpRequest httpRequest = new HttpRequest(POST, "/session");
+ httpRequest.setContent(Contents.utf8String("{\"capabilities\":{\"browserName\":\"chrome\"}}"));
+ return new SessionRequest(requestId, httpRequest, Instant.now());
+ }
+
+ private SessionRequest createSessionRequestWithCapabilities(
+ RequestId requestId, Capabilities capabilities) {
+ HttpRequest httpRequest = new HttpRequest(POST, "/session");
+ httpRequest.setContent(
+ Contents.utf8String("{\"capabilities\":" + JSON.toJson(capabilities) + "}"));
+ return new SessionRequest(requestId, httpRequest, Instant.now());
+ }
+}