Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion external_locking.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ function prepare(config) {
* @param {import('./runner').RunnerState} state
*/
async function refresh(state) {
const {config, locks} = state;
const {config} = state;
assert(state.locking);

const {locks} = state.locking;
assert(locks);
if (locks.size > 0) {
const locksArray = Array.from(locks);
Expand Down
126 changes: 109 additions & 17 deletions locking.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ const output = require('./output');
const {wait} = require('./utils');
const external_locking = require('./external_locking');

/**
* @typedef {{resource: string, client: string, expireIn: number}} Lock
*/

/**
* @typedef {{locks: Set<string>, by_task: Map<string, Set<string>>, pending: Set<string>}} LockingState
*/

function annotateTaskResources(config, task) {
if (config.no_locking) {
Expand All @@ -27,7 +34,11 @@ function annotateTaskResources(config, task) {
async function init(state) {
assert(state);
assert(state.config);
state.locks = new Set();
state.locking = {
locks: new Set(),
by_task: new Map(),
pending: new Set(),
};
external_locking.init(state);
}

Expand All @@ -37,10 +48,9 @@ async function init(state) {
*/
async function shutdown(config, state) {
external_locking.shutdown(state);
state.locks.length = 0;
assert.equal(
state.locks.size, 0,
`Still got some locks on shutdown: ${Array.from(state.locks).sort().join(',')}`);
state.locking.locks.size, 0,
`Still got some locks on shutdown: ${Array.from(state.locking.locks).sort().join(',')}`);
}

/**
Expand All @@ -58,8 +68,9 @@ async function acquire(config, state, task) {
return true;
}

const {locks} = state;
assert(locks);
assert(state.locking);

const {locks, by_task, pending} = state.locking;
if (task.resources.some(r => locks.has(r))) {
if (config.locking_verbose) {
const failed = task.resources.filter(r => locks.has(r));
Expand All @@ -70,7 +81,11 @@ async function acquire(config, state, task) {
}

if (! config.no_external_locking) {
task.resources.forEach(r => pending.add(r));
try {
// TODO: There is no guarantee that all locking attempts are successful.
// I have the suspicion that only some of those may be acquired and if
// everybody has a lock someone else needs, we will starve each other indefinitely
const acquireRes = await external_locking.externalAcquire(config, task.resources, 40000);
if (acquireRes !== true) {
if (config.locking_verbose) {
Expand All @@ -83,18 +98,36 @@ async function acquire(config, state, task) {
} catch(e) {
output.log(config, `[exlocking] Failed to acquire locks for ${task.id}: ${e.stack}`);
return false;
} finally {
task.resources.forEach(r => pending.delete(r));
}
}

if (! by_task.has(task.id)) {
by_task.set(task.id, new Set());
}
const taskLocks = by_task.get(task.id);
for (const r of task.resources) {
locks.add(r);
taskLocks.add(r);
}
if (config.locking_verbose) {
output.log(config, `[locking] ${task.id}: Acquired ${task.resources.join(',')}`);
}
return true;
}

/**
* @param {(waitTime: number) => Promise<boolean>} fn
*/
async function runEventually(fn) {
let waitTime = 50;
while (! await fn(waitTime)) {
await wait(waitTime);
waitTime = Math.min(10000, waitTime * 2);
}
}

/**
* @param {import('./config').Config} config
* @param {import('./runner').RunnerState} state
Expand All @@ -105,12 +138,66 @@ async function acquireEventually(config, state, task) {
if (config.locking_verbose) {
output.log(config, `[locking] ${task.id}: Trying to eventually acquire ${task.resources.join(',')}`);
}
let waitTime = 50;
while (! await acquire(config, state, task)) {
await wait(waitTime);
waitTime = Math.min(10000, waitTime * 2);
return await runEventually(
() => acquire(config, state, task)
);
}

/**
* @param {import('./config').Config} config
* @param {string[]} pool
* @param {number} [count=1] Amount of resources to lock from pool
* @returns {Promise<string[]>} Array with successfully locked ids
*/
async function acquireFromPool(config, pool, count = 1) {
if (config.no_locking) return true;

const taskId = config._taskId;

if (config.locking_verbose) {
output.log(config, `[locking] ${taskId}: Trying to eventually acquire one of ${pool.join(',')}`);
}
return true;

let out = [];
await runEventually(
async (waitTime) => {
/** @type {LockingState} */
const locking = config._locking;

let currentLocks = locking.locks;
if (! config.no_external_locking) {
const used = await external_locking.externalList(config);
currentLocks = new Set(used.map(l => l.resource));
}

const available = [];
for (let i = 0; i < pool.length; i++) {
const r = pool[i];
if (! currentLocks.has(r) && ! locking.pending.has(r)) {
available.push(r);
}

if (available.length >= count) {
break;
}
}

if (available.length < count) {
if (config.locking_verbose) {
output.log(config, `[locking] Failed to acquire lock. Sleeping for ${waitTime}ms. Pool: ${pool.join(', ')}`);
}
return false;
}

const result = acquire(config, {locking}, {id: taskId, resources: available});
if (result) {
out = available;
}
return result;
}
);

return out;
}

/**
Expand All @@ -121,13 +208,16 @@ async function acquireEventually(config, state, task) {
*/
async function release(config, state, task) {
if (config.no_locking) return true;
if (! task.resources.length) {
if (! state.locking.by_task.has(task.id)) {
return;
}

const {locks, by_task} = state.locking;
const taskLocks = by_task.get(task.id);

if (! config.no_external_locking) {
try {
const response = await external_locking.externalRelease(config, task.resources);
const response = await external_locking.externalRelease(config, Array.from(taskLocks));
if (response !== true) {
if (config.locking_verbose) {
output.log(config,
Expand All @@ -140,13 +230,14 @@ async function release(config, state, task) {
}
}

const {locks} = state;
for (const r of task.resources) {
assert(locks.has(r), `Trying to release ${r} for ${task.id}, but not in current locks ${Array.from(locks).sort().join(',')}`);

for (const r of taskLocks) {
assert(locks.has(r), `Trying to release ${r} for ${task.id}, but not in current locks ${Array.from(taskLocks).sort().join(',')}`);
locks.delete(r);
taskLocks.delete(r);
}
if (config.locking_verbose) {
output.log(config, `[locking] ${task.id}: Released ${task.resources.join(',')}`);
output.log(config, `[locking] ${task.id}: Released ${Array.from(taskLocks).join(',')}`);
}
}

Expand Down Expand Up @@ -178,6 +269,7 @@ function listConflicts(config, tasks) {
module.exports = {
acquire,
acquireEventually,
acquireFromPool,
annotateTaskResources,
init,
listConflicts,
Expand Down
13 changes: 8 additions & 5 deletions runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ const version = require('./version');
const {timeoutPromise} = require('./promise_utils');


async function run_task(config, task) {
async function run_task(config, task, state) {
const task_config = {
...config,
_locking: state.locking,
_taskLocks: new Set(),
_browser_pages: [],
_testName: task.tc.name,
_taskName: task.name,
_taskId: task.id,
};
let timeout;
try {
Expand Down Expand Up @@ -143,7 +146,7 @@ async function sequential_run(config, state) {

task.status = 'running';
task.start = performance.now();
await run_task(config, task);
await run_task(config, task, state);

await locking.release(config, state, task);
}
Expand All @@ -158,7 +161,7 @@ async function run_one(config, state, task) {
task.start = performance.now();
output.status(config, state);

await run_task(config, task);
await run_task(config, task, state);

output.status(config, state);
return task;
Expand Down Expand Up @@ -245,7 +248,7 @@ async function parallel_run(config, state) {


/**
* @typedef {{tc: any, status: string, name: string, id: string, skipReason?: string, expectedToFail?: boolean | ((config: any) => boolean)}} Task
* @typedef {{tc: any, status: string, name: string, id: string, skipReason?: string, expectedToFail?: boolean | ((config: any) => boolean), locks?: Set<string>}} Task
*/

function testCases2tasks(config, testCases) {
Expand Down Expand Up @@ -296,7 +299,7 @@ function testCases2tasks(config, testCases) {
}

/**
* @typedef {{config: any, tasks: Task[], locks?: Set<string> }} RunnerState
* @typedef {{config: any, tasks: Task[], locking?: import('./locking').LockingState }} RunnerState
*/

async function run(config, testCases) {
Expand Down
48 changes: 48 additions & 0 deletions tests/selftest_locking_pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const assert = require('assert').strict;
const path = require('path');

const {wait} = require('../utils');
const locking = require('../locking');

async function run(config) {
const baseUrl = `${config.pentf_lockserver_url}test_lockserver_${Math.random()
.toString(36)
.slice(2)}`;
const config1 = {
...config,
no_locking: false,
external_locking_client: 'test_lockserver ONE',
external_locking_url: baseUrl,
};

const pool = ['AAA', 'BBB', 'CCC', 'DDD', 'EEE'].map(r => path.basename(__filename)+r);

const lock1 = await locking.acquireFromPool(config1, pool, 1);
assert.deepEqual(lock1, [pool[0]]);

const lock2 = await locking.acquireFromPool(config1, pool, 1);
assert.deepEqual(lock2, [pool[1]]);

const lock3 = await locking.acquireFromPool(config1, pool, 1);
assert.deepEqual(lock3, [pool[2]]);

// Acquire multiple locks at once
const lock4 = await locking.acquireFromPool(config1, pool, 2);
assert.deepEqual(lock4, [pool[3], pool[4]]);


// At this point all resources of the pool are exhausted.
// We should not be able to get a lock.
const res = await Promise.race([locking.acquireFromPool(config1, pool, 1), wait(100)]);
assert.equal(res, undefined);

// Free one resource
await locking.release(config, {locking: config._locking}, {id: config._taskId});
const lock5 = await locking.acquireFromPool(config1, pool, 1);
assert.deepEqual(lock5, [pool[0]]);
}

module.exports = {
description: 'Lock random item from resource pool',
run,
};