Parse messages on arrival

Rather than queuing inbound messages, the HubController now saves
pending promises/rejects for each pending request. Each inbound packet
is checked at the time of arrival, and if the ID matches a pending
response, the corresponding promise is called.

This fixes a problem where the longer the time between reads, the more
garbage responses queue up that are guaranteed to get thrown away the
next time the next response was gathered.
This commit is contained in:
jakergrossman 2021-11-18 00:29:27 -06:00
parent 8ac7e2c5ea
commit 92d42edaff
3 changed files with 100 additions and 108 deletions

View File

@ -246,7 +246,7 @@ let hub: HubManager | null;
async function connectHub(): Promise<void> {
if (hub) {
vscode.window.showWarningMessage('LEGO Hub is already connected, reconnecting...');
hub.close();
disconnectHub();
}
const config = vscode.workspace.getConfiguration();
@ -283,7 +283,7 @@ async function connectHub(): Promise<void> {
}
async function disconnectHub(): Promise<void> {
if (!hub) {
if (!hub || !hub.isOpen()) {
vscode.window.showErrorMessage('LEGO Hub is not connected');
return;
}
@ -294,7 +294,7 @@ async function disconnectHub(): Promise<void> {
}
async function uploadCurrentFile(): Promise<void> {
if (!hub) {
if (!hub || !hub.isOpen()) {
vscode.window.showErrorMessage('LEGO Hub is not connected!');
return;
}
@ -327,7 +327,7 @@ async function uploadCurrentFile(): Promise<void> {
// TODO: find empty slots
async function runProgram(): Promise<void> {
if (!hub) {
if (!hub || !hub.isOpen()) {
vscode.window.showErrorMessage('LEGO Hub is not connected!');
return;
}
@ -348,7 +348,7 @@ async function runProgram(): Promise<void> {
}
async function stopExecution(): Promise<void> {
if (!hub) {
if (!hub || !hub.isOpen()) {
vscode.window.showErrorMessage('LEGO Hub is not connected!');
return;
}
@ -359,7 +359,7 @@ async function stopExecution(): Promise<void> {
// TODO: find slots from status
async function deleteProgram(): Promise<void> {
if (!hub) {
if (!hub || !hub.isOpen()) {
vscode.window.showErrorMessage('LEGO Hub is not connected!');
return;
}

View File

@ -5,6 +5,14 @@ import commands from './commands';
import AccessNodeProvider from './accessNodeProvider';
// create output channel
const outputChannel = vscode.window.createOutputChannel("SPIKE Prime Output");
export function MindReaderOutput(line: string) {
outputChannel.show();
outputChannel.appendLine(line);
}
let parser: pl.Parser = new pl.Parser();
export function activate(context: vscode.ExtensionContext) {

View File

@ -2,6 +2,7 @@ import * as vscode from 'vscode';
import * as SerialPort from 'serialport';
import * as fs from 'fs';
import { performance } from 'perf_hooks';
import { MindReaderOutput } from './extension';
/**
* @type RPCRequest an RPC request message
@ -48,8 +49,8 @@ type HubOptions = {
*/
export default class HubManager {
private port: SerialPort;
private receiveQueue: string[] = []; // queue of received messages to handle
public responses: RPCResponse[] = []; // list of messages returned to the user
private receiveBuffer: string = ''; // buffer for in-flight messages
private pendingRequests = new Map<string, [(result: any) => void, (error: string) => void]>();
// ======================== INSTANCE METHODS ========================
@ -58,6 +59,65 @@ export default class HubManager {
*/
private constructor(public options: HubOptions) { }
public isOpen(): boolean {
return this.port.isOpen;
}
/**
* Handle a received data chunk from the serial port
*
* @param `data` Data received from serial port
*/
private async receiveData(data: string) {
// add data to buffer
this.receiveBuffer += data;
// get full lines in buffer
let msgs = this.receiveBuffer.split(/\r/); // split by newline
this.receiveBuffer = msgs.pop()!; // store unhandled data
msgs = msgs.filter(x => !x.startsWith('{"m":0,"p":')); // drop sensor broadcast response spam
for (const msg of msgs) {
// check if this msg is a response to a pending request
try {
let json: { [key: string]: any };
json = JSON.parse(msg);
let id = json['i'];
if (id && this.pendingRequests.has(id)) {
// a request is waiting on this response
let [resolve, reject] = this.pendingRequests.get(id) ?? [];
if (json['e'] && reject) {
// error
reject(Buffer.from(json['e'], 'base64').toString());
} else if (resolve) {
resolve(json['r']);
}
this.pendingRequests.delete(id);
} else if (json['m']) {
// Print errors
const params = json['p'];
switch (json['m']) {
case 'user_program_error':
MindReaderOutput(Buffer.from(params[3], 'base64').toString());
MindReaderOutput(Buffer.from(params[4], 'base64').toString());
break;
case 'runtime_error':
MindReaderOutput(Buffer.from(params[3], 'base64').toString());
break;
}
}
} catch (err) {
console.log('Could not parse JSON:', msg);
}
}
}
/**
* Initializes a created HubManager with the current option settings
*/
@ -79,8 +139,10 @@ export default class HubManager {
// push lines received to data queue
let rl = this.port.pipe(new SerialPort.parsers.Readline({delimiter: '\r'}));
rl.on('data', data => this.receiveQueue.push(data));
let mgr = this;
this.port.on('data', data => {
mgr.receiveData(data)
});
}
public async close(): Promise<void> {
@ -97,99 +159,18 @@ export default class HubManager {
*/
// TODO: make send take a single RPCRequest argument, made inline in each function
public async send(request: RPCRequest): Promise<RPCResponse> {
return new Promise(resolve => {
return new Promise((resolve, reject) => {
if (request['i'] === undefined) {
// generate an ID
request['i'] = 'mind-reader-' + HubManager.randomID();
}
// write JSON to port
this.pendingRequests.set(request['i'], [resolve, reject]);
this.port.write(JSON.stringify(request));
this.port.write('\r', async () => {
if (request['i']) {
// expecting a response
let response = await this.recv(request['i']);
resolve(response);
}
});
});
}
/**
* Receive an RPC message.
*
* @param `id` The id to match received messages against. Use `null` to match *all* messages
*/
public async recv(id: string | null): Promise<RPCResponse> {
let index = 0; // index into receive qeueue
let startTime = performance.now();
return new Promise(async (resolve) => {
// used for non-blocking "wait-until" behavior
let rcv = () => {
let elapsedTime = performance.now() - startTime;
if (this.options.timeout !== null && elapsedTime >= this.options.timeout!) {
return resolve({
'e': 'Timed out while receiving message',
'i': id
});
}
// check that there is more data in the queue
if (index < this.receiveQueue.length) {
// get next message in queue
let r = this.receiveQueue[index];
index++;
try {
let j = JSON.parse(r);
// check for matching id
if (id === null || 'i' in j && j['i'] === id) {
let response: RPCResponse;
// is response an error?
if ('e' in j) {
// decode error from base64
let error = JSON.parse(Buffer.from(j['e'], 'base64').toString('ascii'));
response = {
'i': id,
'e': error
};
this.responses.push(response);
return resolve(response);
} else {
response = j;
}
// trim start of queue (just processed)
this.receiveQueue = this.receiveQueue.slice(index);
// return response object
this.responses.push(response);
return resolve(response);
} else {
// not at end of queue, eager retry
setTimeout(rcv, 0);
}
} catch (err) {
// TODO: parse print statements somehow
//console.debug('Could not parse json response: "' + r + '"');
// not at end of queue, eager retry
setTimeout(rcv, 0);
}
} else {
// no more data in queue, wait
// before attempting again
setTimeout(rcv, 1000);
}
};
rcv();
this.port.write('\r');
this.port.drain();
});
}
@ -199,9 +180,9 @@ export default class HubManager {
name = name || file;
const now = performance.now();
const ack = await this.startWriteProgram(name, size, slotid, now, now);
const blockSize = ack['r'].blocksize;
const transferid = ack['r'].transferid;
const ack: {[key: string]: any} = await this.startWriteProgram(name, size, slotid, now, now);
const blockSize = ack.blocksize;
const transferid = ack.transferid;
const numBlocks = Math.ceil(size / blockSize);
@ -226,14 +207,17 @@ export default class HubManager {
* @param `slotid` Slot ID of the program to run
*/
public async programExecute(slotid: number): Promise<RPCResponse> {
return Promise.resolve(
await this.send({
return new Promise(async (resolve) => {
let response = await this.send({
'm': 'program_execute',
'p': {
'slotid': slotid
}
})
);
});
resolve(response);
});
}
/**
@ -285,7 +269,7 @@ export default class HubManager {
'name': name,
'type': 'python', // always python
// eslint-disable-next-line @typescript-eslint/naming-convention
'project_id': '50uN1ZaRpHj2', // TODO: check this
'project_id': HubManager.randomID(16),
};
return Promise.resolve(