This commit is contained in:
y5nw 2024-05-17 20:50:53 +00:00 committed by GitHub
commit 63fcdef948
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 376 additions and 75 deletions

View File

@ -8,15 +8,34 @@ function core.async_event_handler(jobid, retval)
core.async_jobs[jobid] = nil core.async_jobs[jobid] = nil
end end
function core.handle_async(func, callback, ...) local function prepare_async_args(func, callback, ...)
assert(type(func) == "function" and type(callback) == "function", assert(type(func) == "function" and type(callback) == "function",
"Invalid minetest.handle_async invocation") "Invalid invocation of minetest.handle_async or AsyncJob:replace")
local args = {n = select("#", ...), ...} local args = {n = select("#", ...), ...}
local mod_origin = core.get_last_run_mod() local mod_origin = core.get_last_run_mod()
local jobid = core.do_async_callback(func, args, mod_origin) return func, args, mod_origin
core.async_jobs[jobid] = callback
return true
end end
function core.handle_async(func, callback, ...)
local job = core.do_async_callback(prepare_async_args(func, callback, ...))
core.async_jobs[job:get_id()] = callback
return job
end
if core.async_job_methods then
local replace_job = core.async_job_methods.replace
function core.async_job_methods:replace(func, callback, ...)
local newjob = replace_job(self, prepare_async_args(func, callback, ...))
core.async_jobs[newjob:get_id()] = callback
return newjob
end
local dummy = function() end
function core.async_job_methods:cancel()
return self:get_id() == self:replace(dummy, dummy):get_id()
end
core.async_job_methods = nil
end

View File

@ -6689,6 +6689,16 @@ This allows you easy interoperability for delegating work to jobs.
* When `func` returns the callback is called (in the normal environment) * When `func` returns the callback is called (in the normal environment)
with all of the return values as arguments. with all of the return values as arguments.
* Optional: Variable number of arguments that are passed to `func` * Optional: Variable number of arguments that are passed to `func`
* Returns the ID of the async job.
* `minetest.replace_async(id, func, callback, ...)`:
* Try to replace the job with the given ID. The new job is otherwise queued
as if by `minetest.handle_async`.
* Note that the `func`, `callback` and variable arguments are all replaced.
* Returns the ID of the new async job. This is the same as the previous id
if the previous job was canceled.
* `minetest.cancel_async(id)`:
* Try to cancel the job with the given ID.
* Returns whether the job was canceled.
* `minetest.register_async_dofile(path)`: * `minetest.register_async_dofile(path)`:
* Register a path to a Lua file to be imported when an async environment * Register a path to a Lua file to be imported when an async environment
is initialized. You can use this to preload code which you can then call is initialized. You can use this to preload code which you can then call

View File

@ -207,3 +207,55 @@ local function test_vector_preserve(cb)
end, {vec}) end, {vec})
end end
unittests.register("test_async_vector", test_vector_preserve, {async=true}) unittests.register("test_async_vector", test_vector_preserve, {async=true})
local function test_async_job_replacement(cb)
local job = core.handle_async(function(x)
return x
end, function(ret)
return cb("Replaced async callback still run")
end, 1)
local newjob = job:replace(function(x)
return -x
end, function(ret)
if ret ~= -2 then
return cb("Wrong async value passed")
end
end, 2)
if job:get_id() ~= newjob:get_id() then
return cb("AsyncJob:replace sanity check failed")
end
job = core.handle_async(function(x)
return x
end, function()
return cb("Canceled async job run")
end)
if not job:cancel() then
return cb("core.cancel_async sanity check failed")
end
-- Try to replace a job that is already run. Do this by delaying the main thread by some time.
job = core.handle_async(function(x)
return x
end, function(ret)
if ret ~= 1 then
return cb("Wrong async value passed to old handler")
end
newjob = job:replace(function(x)
return -x
end, function(new_ret)
if new_ret ~= -2 then
return cb("Wrong async value passed to new handler")
end
cb()
end, 2)
if job:get_id() == newjob:get_id() then
return cb("AsyncJob:replace replaced a completed job")
end
if job:cancel() then
return cb("AsyncJob:replace canceled a completed job")
end
end, 1)
end
unittests.register("test_async_job_replacement", test_async_job_replacement, {async=true})

View File

@ -35,6 +35,16 @@ extern "C" {
#include "common/c_packer.h" #include "common/c_packer.h"
#include "lua_api/l_base.h" #include "lua_api/l_base.h"
LuaJobInfo::LuaJobInfo(std::string &&func, std::string &&params, const std::string &mod_origin):
function(func), params(params), mod_origin(mod_origin)
{}
LuaJobInfo::LuaJobInfo(std::string &&func, PackedValue *params, const std::string &mod_origin):
function(func), mod_origin(mod_origin)
{
params_ext.reset(params);
}
/******************************************************************************/ /******************************************************************************/
AsyncEngine::~AsyncEngine() AsyncEngine::~AsyncEngine()
{ {
@ -101,40 +111,64 @@ void AsyncEngine::addWorkerThread()
} }
/******************************************************************************/ /******************************************************************************/
u32 AsyncEngine::queueAsyncJob(std::string &&func, std::string &&params,
const std::string &mod_origin) u32 AsyncEngine::queueAsyncJob(LuaJobInfo &&job)
{ {
MutexAutoLock autolock(jobQueueMutex); MutexAutoLock autolock(jobQueueMutex);
u32 jobId = jobIdCounter++; u32 jobId = jobIdCounter++;
jobQueue.emplace_back(); job.id = jobId;
auto &to_add = jobQueue.back(); jobQueue.push_back(std::move(job));
to_add.id = jobId;
to_add.function = std::move(func);
to_add.params = std::move(params);
to_add.mod_origin = mod_origin;
jobQueueCounter.post(); jobQueueCounter.post();
return jobId; return jobId;
} }
u32 AsyncEngine::queueAsyncJob(std::string &&func, std::string &&params,
const std::string &mod_origin)
{
LuaJobInfo to_add(std::move(func), std::move(params), mod_origin);
return queueAsyncJob(std::move(to_add));
}
u32 AsyncEngine::queueAsyncJob(std::string &&func, PackedValue *params, u32 AsyncEngine::queueAsyncJob(std::string &&func, PackedValue *params,
const std::string &mod_origin) const std::string &mod_origin)
{ {
MutexAutoLock autolock(jobQueueMutex); LuaJobInfo to_add(std::move(func), params, mod_origin);
u32 jobId = jobIdCounter++; return queueAsyncJob(std::move(to_add));
}
jobQueue.emplace_back(); u32 AsyncEngine::replaceAsyncJob(const u32 &oldId, LuaJobInfo &&job)
auto &to_add = jobQueue.back(); {
to_add.id = jobId; MutexAutoLock autolock(jobQueueMutex);
to_add.function = std::move(func); int pos = oldId - (jobIdCounter - jobQueue.size());
to_add.params_ext.reset(params); u32 jobId = oldId;
to_add.mod_origin = mod_origin; if (pos < 0 || pos >= jobQueue.size()) {
job.id = jobId = jobIdCounter++;
jobQueue.push_back(std::move(job));
} else {
job.id = jobId;
jobQueue[pos] = std::move(job);
}
jobQueueCounter.post(); jobQueueCounter.post();
return jobId; return jobId;
} }
u32 AsyncEngine::replaceAsyncJob(const u32 &oldId, std::string &&func, std::string &&params,
const std::string &mod_origin)
{
LuaJobInfo to_add(std::move(func), std::move(params), mod_origin);
return replaceAsyncJob(oldId, std::move(to_add));
}
u32 AsyncEngine::replaceAsyncJob(const u32 &oldId, std::string &&func, PackedValue *params,
const std::string &mod_origin)
{
LuaJobInfo to_add(std::move(func), params, mod_origin);
return replaceAsyncJob(oldId, std::move(to_add));
}
/******************************************************************************/ /******************************************************************************/
bool AsyncEngine::getJob(LuaJobInfo *job) bool AsyncEngine::getJob(LuaJobInfo *job)
{ {
@ -389,3 +423,21 @@ void* AsyncWorkerThread::run()
return 0; return 0;
} }
u32 ScriptApiAsync::queueAsync(std::string &&serialized_func,
PackedValue *param, const std::string &mod_origin)
{
return asyncEngine.queueAsyncJob(std::move(serialized_func),
param, mod_origin);
}
u32 ScriptApiAsync::replaceAsync(const u32 &id, std::string &&serialized_func,
PackedValue *param, const std::string &mod_origin)
{
return asyncEngine.replaceAsyncJob(id, std::move(serialized_func),
param, mod_origin);
}
void ScriptApiAsync::stepAsync()
{
asyncEngine.step(getStack());
}

View File

@ -41,6 +41,8 @@ class AsyncEngine;
struct LuaJobInfo struct LuaJobInfo
{ {
LuaJobInfo() = default; LuaJobInfo() = default;
LuaJobInfo(std::string &&func, std::string &&params, const std::string &mod_origin = "");
LuaJobInfo(std::string &&func, PackedValue *params, const std::string &mod_origin = "");
// Function to be called in async environment (from string.dump) // Function to be called in async environment (from string.dump)
std::string function; std::string function;
@ -114,6 +116,26 @@ public:
u32 queueAsyncJob(std::string &&func, PackedValue *params, u32 queueAsyncJob(std::string &&func, PackedValue *params,
const std::string &mod_origin = ""); const std::string &mod_origin = "");
/**
* Replace an async job if possible or queue the job otherwise
* @param oldId The ID of the job to replace
* @param func Serialized lua function
* @param params Serialized parameters
* @return jobid The job is replaced or queued
*/
u32 replaceAsyncJob(const u32 &oldId, std::string &&func, std::string &&params,
const std::string &mod_origin = "");
/**
* Replace an async job if possible or queue the job otherwise
* @param oldId The ID of the job to replace
* @param func Serialized lua function
* @param params Serialized parameters (takes ownership!)
* @return ID of queued job
*/
u32 replaceAsyncJob(const u32 &oldId, std::string &&func, PackedValue *params,
const std::string &mod_origin = "");
/** /**
* Engine step to process finished jobs * Engine step to process finished jobs
* @param L The Lua stack * @param L The Lua stack
@ -129,6 +151,21 @@ protected:
*/ */
bool getJob(LuaJobInfo *job); bool getJob(LuaJobInfo *job);
/**
* Queue an async job
* @param job The job to queue (takes ownership!)
* @return Id of the queued job
*/
u32 queueAsyncJob(LuaJobInfo &&job);
/**
* Replace an async job if possible or queue the job otherwise
* @param jobId The Id of the job to replace
* @param job The new job to use in-place (takes ownership!)
* @return Id of the new job
*/
u32 replaceAsyncJob(const u32 &jobId, LuaJobInfo &&job);
/** /**
* Put a Job result back to result queue * Put a Job result back to result queue
* @param result result of completed job * @param result result of completed job
@ -195,3 +232,21 @@ private:
// Counter semaphore for job dispatching // Counter semaphore for job dispatching
Semaphore jobQueueCounter; Semaphore jobQueueCounter;
}; };
class ScriptApiAsync:
virtual public ScriptApiBase
{
public:
ScriptApiAsync(Server *server): asyncEngine(server) {}
virtual void initAsync() = 0;
void stepAsync();
u32 queueAsync(std::string &&serialized_func,
PackedValue *param, const std::string &mod_origin);
u32 replaceAsync(const u32 &id, std::string &&serialized_func,
PackedValue *param, const std::string &mod_origin);
protected:
AsyncEngine asyncEngine;
};

View File

@ -1,5 +1,6 @@
set(common_SCRIPT_LUA_API_SRCS set(common_SCRIPT_LUA_API_SRCS
${CMAKE_CURRENT_SOURCE_DIR}/l_areastore.cpp ${CMAKE_CURRENT_SOURCE_DIR}/l_areastore.cpp
${CMAKE_CURRENT_SOURCE_DIR}/l_async.cpp
${CMAKE_CURRENT_SOURCE_DIR}/l_auth.cpp ${CMAKE_CURRENT_SOURCE_DIR}/l_auth.cpp
${CMAKE_CURRENT_SOURCE_DIR}/l_base.cpp ${CMAKE_CURRENT_SOURCE_DIR}/l_base.cpp
${CMAKE_CURRENT_SOURCE_DIR}/l_craft.cpp ${CMAKE_CURRENT_SOURCE_DIR}/l_craft.cpp

View File

@ -0,0 +1,116 @@
#include "lua_api/l_internal.h"
#include "lua_api/l_async.h"
#include "cpp_api/s_async.h"
// garbage collector
int LuaAsyncJob::gc_object(lua_State *L)
{
LuaAsyncJob *o = *(LuaAsyncJob **)(lua_touserdata(L, 1));
delete o;
return 0;
}
// get_id() -> id
int LuaAsyncJob::l_get_id(lua_State *L)
{
NO_MAP_LOCK_REQUIRED;
LuaAsyncJob *o = checkObject<LuaAsyncJob>(L, 1);
lua_pushinteger(L, o->get_id());
return 1;
}
static std::string get_serialized_function(lua_State *L, int index)
{
luaL_checktype(L, index, LUA_TFUNCTION);
call_string_dump(L, index);
size_t func_length;
const char *serialized_func_raw = lua_tolstring(L, -1, &func_length);
return std::string(serialized_func_raw, func_length);
}
// replace(self, func, params, mod_origin) -> new_job
// This implements the part that is strictly needed for replacing the job.
// The actual LuaAsyncJob:replace used by mods is implemented in Lua based on this.
int LuaAsyncJob::l_replace(lua_State *L)
{
NO_MAP_LOCK_REQUIRED;
ScriptApiAsync *script = getScriptApi<ScriptApiAsync>(L);
luaL_checktype(L, 3, LUA_TTABLE);
luaL_checktype(L, 4, LUA_TSTRING);
LuaAsyncJob *o = checkObject<LuaAsyncJob>(L, 1);
u32 id = o->get_id();
auto serialized_func = get_serialized_function(L, 2);
PackedValue *param = script_pack(L, 3);
std::string mod_origin = readParam<std::string>(L, 4);
u32 jobId = script->replaceAsync(id,
std::move(serialized_func),
param, mod_origin);
lua_settop(L, 0);
create(L, jobId);
return 1;
}
int LuaAsyncJob::create(lua_State *L, const int id)
{
NO_MAP_LOCK_REQUIRED;
LuaAsyncJob *o = new LuaAsyncJob(id);
*(void **)(lua_newuserdata(L, sizeof(void *))) = o;
luaL_getmetatable(L, className);
lua_setmetatable(L, -2);
return 1;
}
void LuaAsyncJob::Register(lua_State *L)
{
static const luaL_Reg metamethods[] = {
{"__gc", gc_object},
{0, 0}
};
registerClass(L, className, methods, metamethods);
// Expose __index to be complemented by Lua.
lua_getglobal(L, "core");
luaL_getmetatable(L, className);
lua_getfield(L, -1, "__index");
lua_setfield(L, -3, "async_job_methods");
}
const char LuaAsyncJob::className[] = "AsyncJob";
const luaL_Reg LuaAsyncJob::methods[] = {
luamethod(LuaAsyncJob, get_id),
luamethod(LuaAsyncJob, replace),
{0, 0}
};
// do_async_callback(func, params, mod_origin)
int ModApiAsync::l_do_async_callback(lua_State *L)
{
NO_MAP_LOCK_REQUIRED;
ScriptApiAsync *script = getScriptApi<ScriptApiAsync>(L);
luaL_checktype(L, 2, LUA_TTABLE);
luaL_checktype(L, 3, LUA_TSTRING);
auto serialized_func = get_serialized_function(L, 1);
PackedValue *param = script_pack(L, 2);
std::string mod_origin = readParam<std::string>(L, 3);
u32 jobId = script->queueAsync(
std::move(serialized_func),
param, mod_origin);
lua_settop(L, 0);
LuaAsyncJob::create(L, jobId);
return 1;
}
void ModApiAsync::Initialize(lua_State *L, int top)
{
API_FCT(do_async_callback);
}

View File

@ -0,0 +1,44 @@
#pragma once
#include "lua_api/l_base.h"
#include "util/basic_macros.h"
class LuaAsyncJob : public ModApiBase
{
private:
const int id;
LuaAsyncJob(const int id): id(id) {}
static const luaL_Reg methods[];
// garbage collector
static int gc_object(lua_State *L);
// get_id(self) -> id
static int l_get_id(lua_State *L);
// replace(self, func, params, mod_origin) -> id; partially implemented in Lua
static int l_replace(lua_State *L);
// cancel(self) -> boolean: implemented in Lua
public:
DISABLE_CLASS_COPY(LuaAsyncJob)
int get_id() const { return id; }
static int create(lua_State *L, const int id);
static void Register(lua_State *L);
static const char className[];
};
class ModApiAsync : public ModApiBase
{
public:
static void Initialize(lua_State *L, int top);
private:
// do_async_callback(func, params, mod_origin)
static int l_do_async_callback(lua_State *L);
};

View File

@ -648,33 +648,6 @@ int ModApiServer::l_notify_authentication_modified(lua_State *L)
return 0; return 0;
} }
// do_async_callback(func, params, mod_origin)
int ModApiServer::l_do_async_callback(lua_State *L)
{
NO_MAP_LOCK_REQUIRED;
ServerScripting *script = getScriptApi<ServerScripting>(L);
luaL_checktype(L, 1, LUA_TFUNCTION);
luaL_checktype(L, 2, LUA_TTABLE);
luaL_checktype(L, 3, LUA_TSTRING);
call_string_dump(L, 1);
size_t func_length;
const char *serialized_func_raw = lua_tolstring(L, -1, &func_length);
PackedValue *param = script_pack(L, 2);
std::string mod_origin = readParam<std::string>(L, 3);
u32 jobId = script->queueAsync(
std::string(serialized_func_raw, func_length),
param, mod_origin);
lua_settop(L, 0);
lua_pushinteger(L, jobId);
return 1;
}
// register_async_dofile(path) // register_async_dofile(path)
int ModApiServer::l_register_async_dofile(lua_State *L) int ModApiServer::l_register_async_dofile(lua_State *L)
{ {
@ -770,7 +743,6 @@ void ModApiServer::Initialize(lua_State *L, int top)
API_FCT(unban_player_or_ip); API_FCT(unban_player_or_ip);
API_FCT(notify_authentication_modified); API_FCT(notify_authentication_modified);
API_FCT(do_async_callback);
API_FCT(register_async_dofile); API_FCT(register_async_dofile);
API_FCT(serialize_roundtrip); API_FCT(serialize_roundtrip);

View File

@ -115,9 +115,6 @@ private:
// notify_authentication_modified(name) // notify_authentication_modified(name)
static int l_notify_authentication_modified(lua_State *L); static int l_notify_authentication_modified(lua_State *L);
// do_async_callback(func, params, mod_origin)
static int l_do_async_callback(lua_State *L);
// register_async_dofile(path) // register_async_dofile(path)
static int l_register_async_dofile(lua_State *L); static int l_register_async_dofile(lua_State *L);

View File

@ -24,6 +24,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "filesys.h" #include "filesys.h"
#include "cpp_api/s_internal.h" #include "cpp_api/s_internal.h"
#include "lua_api/l_areastore.h" #include "lua_api/l_areastore.h"
#include "lua_api/l_async.h"
#include "lua_api/l_auth.h" #include "lua_api/l_auth.h"
#include "lua_api/l_base.h" #include "lua_api/l_base.h"
#include "lua_api/l_craft.h" #include "lua_api/l_craft.h"
@ -53,7 +54,7 @@ extern "C" {
ServerScripting::ServerScripting(Server* server): ServerScripting::ServerScripting(Server* server):
ScriptApiBase(ScriptingType::Server), ScriptApiBase(ScriptingType::Server),
asyncEngine(server) ScriptApiAsync(server)
{ {
setGameDef(server); setGameDef(server);
@ -128,24 +129,13 @@ void ServerScripting::initAsync()
asyncEngine.initialize(0); asyncEngine.initialize(0);
} }
void ServerScripting::stepAsync()
{
asyncEngine.step(getStack());
}
u32 ServerScripting::queueAsync(std::string &&serialized_func,
PackedValue *param, const std::string &mod_origin)
{
return asyncEngine.queueAsyncJob(std::move(serialized_func),
param, mod_origin);
}
void ServerScripting::InitializeModApi(lua_State *L, int top) void ServerScripting::InitializeModApi(lua_State *L, int top)
{ {
// Register reference classes (userdata) // Register reference classes (userdata)
InvRef::Register(L); InvRef::Register(L);
ItemStackMetaRef::Register(L); ItemStackMetaRef::Register(L);
LuaAreaStore::Register(L); LuaAreaStore::Register(L);
LuaAsyncJob::Register(L);
LuaItemStack::Register(L); LuaItemStack::Register(L);
LuaPerlinNoise::Register(L); LuaPerlinNoise::Register(L);
LuaPerlinNoiseMap::Register(L); LuaPerlinNoiseMap::Register(L);
@ -163,6 +153,7 @@ void ServerScripting::InitializeModApi(lua_State *L, int top)
ModChannelRef::Register(L); ModChannelRef::Register(L);
// Initialize mod api modules // Initialize mod api modules
ModApiAsync::Initialize(L, top);
ModApiAuth::Initialize(L, top); ModApiAuth::Initialize(L, top);
ModApiCraft::Initialize(L, top); ModApiCraft::Initialize(L, top);
ModApiEnv::Initialize(L, top); ModApiEnv::Initialize(L, top);

View File

@ -37,6 +37,7 @@ struct PackedValue;
class ServerScripting: class ServerScripting:
virtual public ScriptApiBase, virtual public ScriptApiBase,
public ScriptApiAsync,
public ScriptApiDetached, public ScriptApiDetached,
public ScriptApiEntity, public ScriptApiEntity,
public ScriptApiEnv, public ScriptApiEnv,
@ -58,17 +59,8 @@ public:
// Initialize async engine, call this AFTER loading all mods // Initialize async engine, call this AFTER loading all mods
void initAsync(); void initAsync();
// Global step handler to collect async results
void stepAsync();
// Pass job to async threads
u32 queueAsync(std::string &&serialized_func,
PackedValue *param, const std::string &mod_origin);
private: private:
void InitializeModApi(lua_State *L, int top); void InitializeModApi(lua_State *L, int top);
static void InitializeAsync(lua_State *L, int top); static void InitializeAsync(lua_State *L, int top);
AsyncEngine asyncEngine;
}; };