diff --git a/builtin/game/async.lua b/builtin/game/async.lua index 469f179d7..f85864649 100644 --- a/builtin/game/async.lua +++ b/builtin/game/async.lua @@ -8,15 +8,34 @@ function core.async_event_handler(jobid, retval) core.async_jobs[jobid] = nil end -function core.handle_async(func, callback, ...) +local function prepare_async_args(func, callback, ...) assert(type(func) == "function" and type(callback) == "function", - "Invalid minetest.handle_async invocation") + "Invalid invocation of minetest.handle_async or AsyncJob:replace") local args = {n = select("#", ...), ...} local mod_origin = core.get_last_run_mod() - local jobid = core.do_async_callback(func, args, mod_origin) - core.async_jobs[jobid] = callback - - return true + return func, args, mod_origin end +function core.handle_async(func, callback, ...) + local job = core.do_async_callback(prepare_async_args(func, callback, ...)) + core.async_jobs[job:get_id()] = callback + + return job +end + +if core.async_job_methods then + local replace_job = core.async_job_methods.replace + function core.async_job_methods:replace(func, callback, ...) + local newjob = replace_job(self, prepare_async_args(func, callback, ...)) + core.async_jobs[newjob:get_id()] = callback + return newjob + end + + local dummy = function() end + function core.async_job_methods:cancel() + return self:get_id() == self:replace(dummy, dummy):get_id() + end + + core.async_job_methods = nil +end diff --git a/doc/lua_api.md b/doc/lua_api.md index d338d23b2..5ab8196e7 100644 --- a/doc/lua_api.md +++ b/doc/lua_api.md @@ -6689,6 +6689,16 @@ This allows you easy interoperability for delegating work to jobs. * When `func` returns the callback is called (in the normal environment) with all of the return values as arguments. * Optional: Variable number of arguments that are passed to `func` + * Returns the ID of the async job. +* `minetest.replace_async(id, func, callback, ...)`: + * Try to replace the job with the given ID. The new job is otherwise queued + as if by `minetest.handle_async`. + * Note that the `func`, `callback` and variable arguments are all replaced. + * Returns the ID of the new async job. This is the same as the previous id + if the previous job was canceled. +* `minetest.cancel_async(id)`: + * Try to cancel the job with the given ID. + * Returns whether the job was canceled. * `minetest.register_async_dofile(path)`: * Register a path to a Lua file to be imported when an async environment is initialized. You can use this to preload code which you can then call diff --git a/games/devtest/mods/unittests/async_env.lua b/games/devtest/mods/unittests/async_env.lua index d7a714941..5e16a4339 100644 --- a/games/devtest/mods/unittests/async_env.lua +++ b/games/devtest/mods/unittests/async_env.lua @@ -207,3 +207,55 @@ local function test_vector_preserve(cb) end, {vec}) end unittests.register("test_async_vector", test_vector_preserve, {async=true}) + +local function test_async_job_replacement(cb) + local job = core.handle_async(function(x) + return x + end, function(ret) + return cb("Replaced async callback still run") + end, 1) + local newjob = job:replace(function(x) + return -x + end, function(ret) + if ret ~= -2 then + return cb("Wrong async value passed") + end + end, 2) + if job:get_id() ~= newjob:get_id() then + return cb("AsyncJob:replace sanity check failed") + end + + job = core.handle_async(function(x) + return x + end, function() + return cb("Canceled async job run") + end) + if not job:cancel() then + return cb("core.cancel_async sanity check failed") + end + + -- Try to replace a job that is already run. Do this by delaying the main thread by some time. + job = core.handle_async(function(x) + return x + end, function(ret) + if ret ~= 1 then + return cb("Wrong async value passed to old handler") + end + + newjob = job:replace(function(x) + return -x + end, function(new_ret) + if new_ret ~= -2 then + return cb("Wrong async value passed to new handler") + end + cb() + end, 2) + if job:get_id() == newjob:get_id() then + return cb("AsyncJob:replace replaced a completed job") + end + if job:cancel() then + return cb("AsyncJob:replace canceled a completed job") + end + end, 1) +end +unittests.register("test_async_job_replacement", test_async_job_replacement, {async=true}) diff --git a/src/script/cpp_api/s_async.cpp b/src/script/cpp_api/s_async.cpp index 75b1a8205..fc19dbb8b 100644 --- a/src/script/cpp_api/s_async.cpp +++ b/src/script/cpp_api/s_async.cpp @@ -35,6 +35,16 @@ extern "C" { #include "common/c_packer.h" #include "lua_api/l_base.h" +LuaJobInfo::LuaJobInfo(std::string &&func, std::string &¶ms, const std::string &mod_origin): + function(func), params(params), mod_origin(mod_origin) +{} + +LuaJobInfo::LuaJobInfo(std::string &&func, PackedValue *params, const std::string &mod_origin): + function(func), mod_origin(mod_origin) +{ + params_ext.reset(params); +} + /******************************************************************************/ AsyncEngine::~AsyncEngine() { @@ -101,40 +111,64 @@ void AsyncEngine::addWorkerThread() } /******************************************************************************/ -u32 AsyncEngine::queueAsyncJob(std::string &&func, std::string &¶ms, - const std::string &mod_origin) + +u32 AsyncEngine::queueAsyncJob(LuaJobInfo &&job) { MutexAutoLock autolock(jobQueueMutex); u32 jobId = jobIdCounter++; - jobQueue.emplace_back(); - auto &to_add = jobQueue.back(); - to_add.id = jobId; - to_add.function = std::move(func); - to_add.params = std::move(params); - to_add.mod_origin = mod_origin; + job.id = jobId; + jobQueue.push_back(std::move(job)); jobQueueCounter.post(); return jobId; } +u32 AsyncEngine::queueAsyncJob(std::string &&func, std::string &¶ms, + const std::string &mod_origin) +{ + LuaJobInfo to_add(std::move(func), std::move(params), mod_origin); + return queueAsyncJob(std::move(to_add)); +} + u32 AsyncEngine::queueAsyncJob(std::string &&func, PackedValue *params, const std::string &mod_origin) { - MutexAutoLock autolock(jobQueueMutex); - u32 jobId = jobIdCounter++; + LuaJobInfo to_add(std::move(func), params, mod_origin); + return queueAsyncJob(std::move(to_add)); +} - jobQueue.emplace_back(); - auto &to_add = jobQueue.back(); - to_add.id = jobId; - to_add.function = std::move(func); - to_add.params_ext.reset(params); - to_add.mod_origin = mod_origin; +u32 AsyncEngine::replaceAsyncJob(const u32 &oldId, LuaJobInfo &&job) +{ + MutexAutoLock autolock(jobQueueMutex); + int pos = oldId - (jobIdCounter - jobQueue.size()); + u32 jobId = oldId; + if (pos < 0 || pos >= jobQueue.size()) { + job.id = jobId = jobIdCounter++; + jobQueue.push_back(std::move(job)); + } else { + job.id = jobId; + jobQueue[pos] = std::move(job); + } jobQueueCounter.post(); return jobId; } +u32 AsyncEngine::replaceAsyncJob(const u32 &oldId, std::string &&func, std::string &¶ms, + const std::string &mod_origin) +{ + LuaJobInfo to_add(std::move(func), std::move(params), mod_origin); + return replaceAsyncJob(oldId, std::move(to_add)); +} + +u32 AsyncEngine::replaceAsyncJob(const u32 &oldId, std::string &&func, PackedValue *params, + const std::string &mod_origin) +{ + LuaJobInfo to_add(std::move(func), params, mod_origin); + return replaceAsyncJob(oldId, std::move(to_add)); +} + /******************************************************************************/ bool AsyncEngine::getJob(LuaJobInfo *job) { @@ -389,3 +423,21 @@ void* AsyncWorkerThread::run() return 0; } +u32 ScriptApiAsync::queueAsync(std::string &&serialized_func, + PackedValue *param, const std::string &mod_origin) +{ + return asyncEngine.queueAsyncJob(std::move(serialized_func), + param, mod_origin); +} + +u32 ScriptApiAsync::replaceAsync(const u32 &id, std::string &&serialized_func, + PackedValue *param, const std::string &mod_origin) +{ + return asyncEngine.replaceAsyncJob(id, std::move(serialized_func), + param, mod_origin); +} + +void ScriptApiAsync::stepAsync() +{ + asyncEngine.step(getStack()); +} diff --git a/src/script/cpp_api/s_async.h b/src/script/cpp_api/s_async.h index 1e34e40ea..691242dbc 100644 --- a/src/script/cpp_api/s_async.h +++ b/src/script/cpp_api/s_async.h @@ -41,6 +41,8 @@ class AsyncEngine; struct LuaJobInfo { LuaJobInfo() = default; + LuaJobInfo(std::string &&func, std::string &¶ms, const std::string &mod_origin = ""); + LuaJobInfo(std::string &&func, PackedValue *params, const std::string &mod_origin = ""); // Function to be called in async environment (from string.dump) std::string function; @@ -114,6 +116,26 @@ public: u32 queueAsyncJob(std::string &&func, PackedValue *params, const std::string &mod_origin = ""); + /** + * Replace an async job if possible or queue the job otherwise + * @param oldId The ID of the job to replace + * @param func Serialized lua function + * @param params Serialized parameters + * @return jobid The job is replaced or queued + */ + u32 replaceAsyncJob(const u32 &oldId, std::string &&func, std::string &¶ms, + const std::string &mod_origin = ""); + + /** + * Replace an async job if possible or queue the job otherwise + * @param oldId The ID of the job to replace + * @param func Serialized lua function + * @param params Serialized parameters (takes ownership!) + * @return ID of queued job + */ + u32 replaceAsyncJob(const u32 &oldId, std::string &&func, PackedValue *params, + const std::string &mod_origin = ""); + /** * Engine step to process finished jobs * @param L The Lua stack @@ -129,6 +151,21 @@ protected: */ bool getJob(LuaJobInfo *job); + /** + * Queue an async job + * @param job The job to queue (takes ownership!) + * @return Id of the queued job + */ + u32 queueAsyncJob(LuaJobInfo &&job); + + /** + * Replace an async job if possible or queue the job otherwise + * @param jobId The Id of the job to replace + * @param job The new job to use in-place (takes ownership!) + * @return Id of the new job + */ + u32 replaceAsyncJob(const u32 &jobId, LuaJobInfo &&job); + /** * Put a Job result back to result queue * @param result result of completed job @@ -195,3 +232,21 @@ private: // Counter semaphore for job dispatching Semaphore jobQueueCounter; }; + +class ScriptApiAsync: + virtual public ScriptApiBase +{ +public: + ScriptApiAsync(Server *server): asyncEngine(server) {} + + virtual void initAsync() = 0; + void stepAsync(); + + u32 queueAsync(std::string &&serialized_func, + PackedValue *param, const std::string &mod_origin); + u32 replaceAsync(const u32 &id, std::string &&serialized_func, + PackedValue *param, const std::string &mod_origin); + +protected: + AsyncEngine asyncEngine; +}; diff --git a/src/script/lua_api/CMakeLists.txt b/src/script/lua_api/CMakeLists.txt index d9405e4fe..dac9f40c8 100644 --- a/src/script/lua_api/CMakeLists.txt +++ b/src/script/lua_api/CMakeLists.txt @@ -1,5 +1,6 @@ set(common_SCRIPT_LUA_API_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/l_areastore.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/l_async.cpp ${CMAKE_CURRENT_SOURCE_DIR}/l_auth.cpp ${CMAKE_CURRENT_SOURCE_DIR}/l_base.cpp ${CMAKE_CURRENT_SOURCE_DIR}/l_craft.cpp diff --git a/src/script/lua_api/l_async.cpp b/src/script/lua_api/l_async.cpp new file mode 100644 index 000000000..e2c8ec7be --- /dev/null +++ b/src/script/lua_api/l_async.cpp @@ -0,0 +1,116 @@ +#include "lua_api/l_internal.h" +#include "lua_api/l_async.h" +#include "cpp_api/s_async.h" + +// garbage collector +int LuaAsyncJob::gc_object(lua_State *L) +{ + LuaAsyncJob *o = *(LuaAsyncJob **)(lua_touserdata(L, 1)); + delete o; + return 0; +} + +// get_id() -> id +int LuaAsyncJob::l_get_id(lua_State *L) +{ + NO_MAP_LOCK_REQUIRED; + LuaAsyncJob *o = checkObject(L, 1); + lua_pushinteger(L, o->get_id()); + return 1; +} + +static std::string get_serialized_function(lua_State *L, int index) +{ + luaL_checktype(L, index, LUA_TFUNCTION); + call_string_dump(L, index); + size_t func_length; + const char *serialized_func_raw = lua_tolstring(L, -1, &func_length); + return std::string(serialized_func_raw, func_length); +} + +// replace(self, func, params, mod_origin) -> new_job +// This implements the part that is strictly needed for replacing the job. +// The actual LuaAsyncJob:replace used by mods is implemented in Lua based on this. +int LuaAsyncJob::l_replace(lua_State *L) +{ + NO_MAP_LOCK_REQUIRED; + ScriptApiAsync *script = getScriptApi(L); + + luaL_checktype(L, 3, LUA_TTABLE); + luaL_checktype(L, 4, LUA_TSTRING); + + LuaAsyncJob *o = checkObject(L, 1); + u32 id = o->get_id(); + auto serialized_func = get_serialized_function(L, 2); + PackedValue *param = script_pack(L, 3); + std::string mod_origin = readParam(L, 4); + + u32 jobId = script->replaceAsync(id, + std::move(serialized_func), + param, mod_origin); + + lua_settop(L, 0); + create(L, jobId); + return 1; +} + +int LuaAsyncJob::create(lua_State *L, const int id) +{ + NO_MAP_LOCK_REQUIRED; + LuaAsyncJob *o = new LuaAsyncJob(id); + *(void **)(lua_newuserdata(L, sizeof(void *))) = o; + luaL_getmetatable(L, className); + lua_setmetatable(L, -2); + return 1; +} + +void LuaAsyncJob::Register(lua_State *L) +{ + static const luaL_Reg metamethods[] = { + {"__gc", gc_object}, + {0, 0} + }; + + registerClass(L, className, methods, metamethods); + + // Expose __index to be complemented by Lua. + lua_getglobal(L, "core"); + luaL_getmetatable(L, className); + lua_getfield(L, -1, "__index"); + lua_setfield(L, -3, "async_job_methods"); +} + +const char LuaAsyncJob::className[] = "AsyncJob"; +const luaL_Reg LuaAsyncJob::methods[] = { + luamethod(LuaAsyncJob, get_id), + luamethod(LuaAsyncJob, replace), + {0, 0} +}; + + +// do_async_callback(func, params, mod_origin) +int ModApiAsync::l_do_async_callback(lua_State *L) +{ + NO_MAP_LOCK_REQUIRED; + ScriptApiAsync *script = getScriptApi(L); + + luaL_checktype(L, 2, LUA_TTABLE); + luaL_checktype(L, 3, LUA_TSTRING); + + auto serialized_func = get_serialized_function(L, 1); + PackedValue *param = script_pack(L, 2); + std::string mod_origin = readParam(L, 3); + + u32 jobId = script->queueAsync( + std::move(serialized_func), + param, mod_origin); + + lua_settop(L, 0); + LuaAsyncJob::create(L, jobId); + return 1; +} + +void ModApiAsync::Initialize(lua_State *L, int top) +{ + API_FCT(do_async_callback); +} diff --git a/src/script/lua_api/l_async.h b/src/script/lua_api/l_async.h new file mode 100644 index 000000000..719a3cc90 --- /dev/null +++ b/src/script/lua_api/l_async.h @@ -0,0 +1,44 @@ +#pragma once + +#include "lua_api/l_base.h" +#include "util/basic_macros.h" + +class LuaAsyncJob : public ModApiBase +{ +private: + const int id; + + LuaAsyncJob(const int id): id(id) {} + + static const luaL_Reg methods[]; + + // garbage collector + static int gc_object(lua_State *L); + + // get_id(self) -> id + static int l_get_id(lua_State *L); + + // replace(self, func, params, mod_origin) -> id; partially implemented in Lua + static int l_replace(lua_State *L); + + // cancel(self) -> boolean: implemented in Lua + +public: + DISABLE_CLASS_COPY(LuaAsyncJob) + + int get_id() const { return id; } + + static int create(lua_State *L, const int id); + + static void Register(lua_State *L); + static const char className[]; +}; + +class ModApiAsync : public ModApiBase +{ +public: + static void Initialize(lua_State *L, int top); +private: + // do_async_callback(func, params, mod_origin) + static int l_do_async_callback(lua_State *L); +}; diff --git a/src/script/lua_api/l_server.cpp b/src/script/lua_api/l_server.cpp index af9a526e0..4bce37532 100644 --- a/src/script/lua_api/l_server.cpp +++ b/src/script/lua_api/l_server.cpp @@ -648,33 +648,6 @@ int ModApiServer::l_notify_authentication_modified(lua_State *L) return 0; } -// do_async_callback(func, params, mod_origin) -int ModApiServer::l_do_async_callback(lua_State *L) -{ - NO_MAP_LOCK_REQUIRED; - ServerScripting *script = getScriptApi(L); - - luaL_checktype(L, 1, LUA_TFUNCTION); - luaL_checktype(L, 2, LUA_TTABLE); - luaL_checktype(L, 3, LUA_TSTRING); - - call_string_dump(L, 1); - size_t func_length; - const char *serialized_func_raw = lua_tolstring(L, -1, &func_length); - - PackedValue *param = script_pack(L, 2); - - std::string mod_origin = readParam(L, 3); - - u32 jobId = script->queueAsync( - std::string(serialized_func_raw, func_length), - param, mod_origin); - - lua_settop(L, 0); - lua_pushinteger(L, jobId); - return 1; -} - // register_async_dofile(path) int ModApiServer::l_register_async_dofile(lua_State *L) { @@ -770,7 +743,6 @@ void ModApiServer::Initialize(lua_State *L, int top) API_FCT(unban_player_or_ip); API_FCT(notify_authentication_modified); - API_FCT(do_async_callback); API_FCT(register_async_dofile); API_FCT(serialize_roundtrip); diff --git a/src/script/lua_api/l_server.h b/src/script/lua_api/l_server.h index 505dce735..880a22953 100644 --- a/src/script/lua_api/l_server.h +++ b/src/script/lua_api/l_server.h @@ -115,9 +115,6 @@ private: // notify_authentication_modified(name) static int l_notify_authentication_modified(lua_State *L); - // do_async_callback(func, params, mod_origin) - static int l_do_async_callback(lua_State *L); - // register_async_dofile(path) static int l_register_async_dofile(lua_State *L); diff --git a/src/script/scripting_server.cpp b/src/script/scripting_server.cpp index 324850011..eaea5a57a 100644 --- a/src/script/scripting_server.cpp +++ b/src/script/scripting_server.cpp @@ -24,6 +24,7 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "filesys.h" #include "cpp_api/s_internal.h" #include "lua_api/l_areastore.h" +#include "lua_api/l_async.h" #include "lua_api/l_auth.h" #include "lua_api/l_base.h" #include "lua_api/l_craft.h" @@ -53,7 +54,7 @@ extern "C" { ServerScripting::ServerScripting(Server* server): ScriptApiBase(ScriptingType::Server), - asyncEngine(server) + ScriptApiAsync(server) { setGameDef(server); @@ -128,24 +129,13 @@ void ServerScripting::initAsync() asyncEngine.initialize(0); } -void ServerScripting::stepAsync() -{ - asyncEngine.step(getStack()); -} - -u32 ServerScripting::queueAsync(std::string &&serialized_func, - PackedValue *param, const std::string &mod_origin) -{ - return asyncEngine.queueAsyncJob(std::move(serialized_func), - param, mod_origin); -} - void ServerScripting::InitializeModApi(lua_State *L, int top) { // Register reference classes (userdata) InvRef::Register(L); ItemStackMetaRef::Register(L); LuaAreaStore::Register(L); + LuaAsyncJob::Register(L); LuaItemStack::Register(L); LuaPerlinNoise::Register(L); LuaPerlinNoiseMap::Register(L); @@ -163,6 +153,7 @@ void ServerScripting::InitializeModApi(lua_State *L, int top) ModChannelRef::Register(L); // Initialize mod api modules + ModApiAsync::Initialize(L, top); ModApiAuth::Initialize(L, top); ModApiCraft::Initialize(L, top); ModApiEnv::Initialize(L, top); diff --git a/src/script/scripting_server.h b/src/script/scripting_server.h index 20d99313a..23f008b0c 100644 --- a/src/script/scripting_server.h +++ b/src/script/scripting_server.h @@ -37,6 +37,7 @@ struct PackedValue; class ServerScripting: virtual public ScriptApiBase, + public ScriptApiAsync, public ScriptApiDetached, public ScriptApiEntity, public ScriptApiEnv, @@ -58,17 +59,8 @@ public: // Initialize async engine, call this AFTER loading all mods void initAsync(); - // Global step handler to collect async results - void stepAsync(); - - // Pass job to async threads - u32 queueAsync(std::string &&serialized_func, - PackedValue *param, const std::string &mod_origin); - private: void InitializeModApi(lua_State *L, int top); static void InitializeAsync(lua_State *L, int top); - - AsyncEngine asyncEngine; };