mirror of
				https://github.com/luanti-org/luanti.git
				synced 2025-10-25 05:35:25 +02:00 
			
		
		
		
	Warn if async engine seems stuck (#16010)
This commit is contained in:
		| @@ -24,6 +24,11 @@ extern "C" { | ||||
| #endif | ||||
| #include "lua_api/l_base.h" | ||||
| 
 | ||||
| // if a job is waiting for this duration, an additional thread will be spawned
 | ||||
| static constexpr int AUTOSCALE_DELAY_MS = 1000; | ||||
| // if jobs are waiting for this duration, a warning is printed
 | ||||
| static constexpr int STUCK_DELAY_MS = 11500; | ||||
| 
 | ||||
| /******************************************************************************/ | ||||
| AsyncEngine::~AsyncEngine() | ||||
| { | ||||
| @@ -156,6 +161,7 @@ void AsyncEngine::step(lua_State *L) | ||||
| { | ||||
| 	stepJobResults(L); | ||||
| 	stepAutoscale(); | ||||
| 	stepStuckWarning(); | ||||
| } | ||||
| 
 | ||||
| void AsyncEngine::stepJobResults(lua_State *L) | ||||
| @@ -203,11 +209,9 @@ void AsyncEngine::stepAutoscale() | ||||
| 	if (autoscaleTimer && porting::getTimeMs() >= autoscaleTimer) { | ||||
| 		autoscaleTimer = 0; | ||||
| 		// Determine overlap with previous snapshot
 | ||||
| 		unsigned int n = 0; | ||||
| 		for (const auto &it : jobQueue) | ||||
| 			n += autoscaleSeenJobs.count(it.id); | ||||
| 		autoscaleSeenJobs.clear(); | ||||
| 		infostream << "AsyncEngine: " << n << " jobs were still waiting after 1s" << std::endl; | ||||
| 		size_t n = compareJobs(autoscaleSeenJobs); | ||||
| 		infostream << "AsyncEngine: " << n << " jobs were still waiting after " | ||||
| 			<< AUTOSCALE_DELAY_MS << "ms, adding more threads." << std::endl; | ||||
| 		// Start this many new threads
 | ||||
| 		while (workerThreads.size() < autoscaleMaxWorkers && n > 0) { | ||||
| 			addWorkerThread(); | ||||
| @@ -216,13 +220,34 @@ void AsyncEngine::stepAutoscale() | ||||
| 		return; | ||||
| 	} | ||||
| 
 | ||||
| 	// 1) Check if there's anything in the queue
 | ||||
| 	// 1) Check queue contents
 | ||||
| 	if (!autoscaleTimer && !jobQueue.empty()) { | ||||
| 		// Take a snapshot of all jobs we have seen
 | ||||
| 		for (const auto &it : jobQueue) | ||||
| 			autoscaleSeenJobs.emplace(it.id); | ||||
| 		// and set a timer for 1 second
 | ||||
| 		autoscaleTimer = porting::getTimeMs() + 1000; | ||||
| 		autoscaleSeenJobs.clear(); | ||||
| 		snapshotJobs(autoscaleSeenJobs); | ||||
| 		autoscaleTimer = porting::getTimeMs() + AUTOSCALE_DELAY_MS; | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| void AsyncEngine::stepStuckWarning() | ||||
| { | ||||
| 	MutexAutoLock autolock(jobQueueMutex); | ||||
| 
 | ||||
| 	// 2) If the timer elapsed, check again
 | ||||
| 	if (stuckTimer && porting::getTimeMs() >= stuckTimer) { | ||||
| 		stuckTimer = 0; | ||||
| 		size_t n = compareJobs(stuckSeenJobs); | ||||
| 		if (n > 0) { | ||||
| 			warningstream << "AsyncEngine: " << n << " jobs seem to be stuck in queue" | ||||
| 				" (" << workerThreads.size() << " workers active)" << std::endl; | ||||
| 		} | ||||
| 		// fallthrough
 | ||||
| 	} | ||||
| 
 | ||||
| 	// 1) Check queue contents
 | ||||
| 	if (!stuckTimer && !jobQueue.empty()) { | ||||
| 		stuckSeenJobs.clear(); | ||||
| 		snapshotJobs(stuckSeenJobs); | ||||
| 		stuckTimer = porting::getTimeMs() + STUCK_DELAY_MS; | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|   | ||||
| @@ -138,6 +138,11 @@ protected: | ||||
| 	 */ | ||||
| 	void stepAutoscale(); | ||||
| 
 | ||||
| 	/**
 | ||||
| 	 * Print warning message if too many jobs are stuck | ||||
| 	 */ | ||||
| 	void stepStuckWarning(); | ||||
| 
 | ||||
| 	/**
 | ||||
| 	 * Initialize environment with current registred functions | ||||
| 	 *  this function adds all functions registred by registerFunction to the | ||||
| @@ -149,6 +154,21 @@ protected: | ||||
| 	bool prepareEnvironment(lua_State* L, int top); | ||||
| 
 | ||||
| private: | ||||
| 	template <typename T> | ||||
| 	inline void snapshotJobs(T &to) | ||||
| 	{ | ||||
| 		for (const auto &it : jobQueue) | ||||
| 			to.emplace(it.id); | ||||
| 	} | ||||
| 	template <typename T> | ||||
| 	inline size_t compareJobs(const T &from) | ||||
| 	{ | ||||
| 		size_t overlap = 0; | ||||
| 		for (const auto &it : jobQueue) | ||||
| 			overlap += from.count(it.id); | ||||
| 		return overlap; | ||||
| 	} | ||||
| 
 | ||||
| 	// Variable locking the engine against further modification
 | ||||
| 	bool initDone = false; | ||||
| 
 | ||||
| @@ -158,6 +178,9 @@ private: | ||||
| 	u64 autoscaleTimer = 0; | ||||
| 	std::unordered_set<u32> autoscaleSeenJobs; | ||||
| 
 | ||||
| 	u64 stuckTimer = 0; | ||||
| 	std::unordered_set<u32> stuckSeenJobs; | ||||
| 
 | ||||
| 	// Only set for the server async environment (duh)
 | ||||
| 	Server *server = nullptr; | ||||
| 
 | ||||
|   | ||||
		Reference in New Issue
	
	Block a user