blob: f738a0f708311568d9a84527087117c32a27faa7 [file] [log] [blame] [edit]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# Runs the touchpad drag latency test using QuickStep v2
# Usage example:
# $ python qstep.py 6
# Input device : /dev/input/event6
# Serial device : /dev/ttyACM1
# Laser log file : /tmp/QuickStep_2016_01_21__1554_22_lsaer.log
# evtest log file: /tmp/QuickStep_2016_01_21__1554_22_evtest.log
# Clock zeroed at 1453409662 (rt 0.259ms)
# ........................................
# Processing data, may take a minute or two...
# Drag latency (min method) = 19.62 ms
# Average Maximum Minimum
# 0.0237723313845 0.0333168506622 0.0167829990387
#
# Note, before running this script, check that evtest can grab the device.
import argparse
import glob
import os
import subprocess
import sys
import tempfile
import time
import serial
import numpy
from . import latency_measurement as lm
from . import minimization
# Teensy commands (always singe char). Defined in QuickStep.ino
# TODO(kamrik): link to QuickStep.ino once it's opensourced.
CMD_RESET = 'F'
CMD_SYNC_ZERO = 'Z'
CMD_TIME_NOW = 'T'
CMD_AUTO_LASER_ON = 'L'
CMD_AUTO_LASER_OFF = 'l'
# Globals
debug_mode = False
def log(msg):
if debug_mode:
print(msg)
def sndrcv(ser, data):
""" Send a 1-char command.
Return the reply and how long it took.
"""
t0 = time.time()
ser.write(data.encode())
reply = ser.readline().decode()
t1 = time.time()
dt = (t1 - t0) * 1000
log('sndrcv(): round trip %.3fms, reply=%s' % (dt, reply.strip()))
return dt, reply
def runCommStats(ser, N=100):
"""
Measure the USB serial round trip time.
Send CMD_TIME_NOW to the Teensy N times measuring the round trip each time.
Prints out stats (min, median, max).
"""
log('Running USB comm stats...')
ser.flushInput()
sndrcv(ser, CMD_SYNC_ZERO)
tstart = time.time()
times = numpy.zeros((N, 1))
for i in range(N):
dt, _ = sndrcv(ser, CMD_TIME_NOW)
times[i] = dt
t_tot = (time.time() - tstart)*1000
median = numpy.median(times)
stats = (times.min(), median, times.max(), N)
log('USB comm round trip stats:')
log('min=%.2fms, median=%.2fms, max=%.2fms N=%d' % stats)
if (median > 2):
print('ERROR: the median round trip is too high: %.2f' % median)
sys.exit(2)
def zeroClock(ser, max_delay_ms=1.0, retries=10):
"""
Tell the TeensyUSB to zero its clock (CMD_SYNC_ZERO).
Returns the time when the command was sent.
Verify that the response arrived within max_delay_ms.
This is the simple zeroing used when the round trip is fast.
It does not employ the same method as Android clock sync.
"""
# Check that we get reasonable ping time with Teensy
# this also 'warms up' the comms, first msg is often slower
runCommStats(ser, N=10)
ser.flushInput()
for i in range(retries):
t0 = time.time()
dt, _ = sndrcv(ser, CMD_SYNC_ZERO)
if dt < max_delay_ms:
print('Clock zeroed at %.0f (rt %.3fms)' % (t0, dt))
return t0
print('Error, failed to zero the clock after %d retries')
return -1
def parseTrigger(trigger_line):
""" Parse a trigger line from QuickStep.
Trigger events look like this: "G L 12902345 1 1"
The parts:
* G - common for all trigger events
* L - means laser
* 12902345 is timestamp in us since zeroed
* 1st 1 or 0 is trigger value. 0 = changed to dark, 1 = changed to light,
* 2nd 1 is counter of how many times this trigger happened since last
readout, should always be 1 in our case
"""
parts = trigger_line.strip().split()
if len(parts) != 5:
raise Exception('Malformed laser trigger line:\n' + trigger_line)
t_us = int(parts[2])
val = int(parts[3])
return (t_us / 1e6, val)
def parse_args():
temp_dir = tempfile.gettempdir()
serial = '/dev/ttyACM0'
# Try to autodetect the QuickStep serial port
ls_ttyACM = glob.glob('/dev/ttyACM*')
if len(ls_ttyACM) > 0:
serial = ls_ttyACM[0]
description = "Run the touchpad drag latency test using QuickStep v2"
parser = argparse.ArgumentParser(
description=description,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('input',
help='input device, e.g: 6 or /dev/input/event6')
parser.add_argument('-s', '--serial', default=serial,
help='QuickStep serial port')
parser.add_argument('-l', '--logdir', default=temp_dir,
help='where to store logs')
parser.add_argument('-n', default=40, type=int,
help='Number of laser toggles to read')
parser.add_argument('-d', '--debug', action='store_true',
help='talk more')
args = parser.parse_args()
global debug_mode
debug_mode = args.debug
if args.input.isalnum():
args.input = '/dev/input/event' + args.input
return args
if __name__ == '__main__':
args = parse_args()
# Create names for log files
prefix = time.strftime('QuickStep_%Y_%m_%d__%H%M_%S')
laser_file_name = os.path.join(args.logdir, prefix + '_lsaer.log')
evtest_file_name = os.path.join(args.logdir, prefix + '_evtest.log')
print('Input device : ' + args.input)
print('Serial device : ' + args.serial)
print('Laser log file : ' + laser_file_name)
print('evtest log file: ' + evtest_file_name)
with serial.Serial(args.serial, 115200) as ser:
sndrcv(ser, CMD_RESET)
tstart = time.time()
t_zero = zeroClock(ser)
if t_zero < 0:
print('Error: Couldn\'t zero clock, exitting')
sys.exit(1)
# Fire up the evtest process
cmd = 'evtest %s > %s' % (args.input, evtest_file_name)
evtest = subprocess.Popen(cmd, shell=True)
# Turn on laser trigger auto-sending
sndrcv(ser, CMD_AUTO_LASER_ON)
trigger_count = 0
while trigger_count < args.n:
# The following line blocks until a message from QuickStep arrives
trigger_line = ser.readline().decode()
trigger_count += 1
log('#%d/%d - ' % (trigger_count, args.n) +
trigger_line.strip())
if not debug_mode:
sys.stdout.write('.')
sys.stdout.flush()
t, val = parseTrigger(trigger_line)
t += t_zero
with open(laser_file_name, 'at') as flaser:
flaser.write('%.3f %d\n' % (t, val))
sndrcv(ser, CMD_AUTO_LASER_OFF)
# Send SIGTERM to evtest process
evtest.terminate()
print("\nProcessing data, may take a minute or two...")
lm.main(evtest_file_name, laser_file_name)