Make database asynchronous
This commit is contained in:
153
colddb.lua
153
colddb.lua
@ -33,7 +33,9 @@ function colddb.get_db(directory)
|
||||
iterate_queue = {},
|
||||
indexes = false,
|
||||
add_to_mem_pool = true,
|
||||
async_pool = extended_api.Async.create_async_pool(),
|
||||
}
|
||||
extended_api.Async.priority(db.async_pool,150,250)
|
||||
-- make tables weak so the garbage-collector will remove unused data
|
||||
setmetatable(db.tags, {__mode = "kv"})
|
||||
setmetatable(db.mem_pool, {__mode = "kv"})
|
||||
@ -250,7 +252,7 @@ function colddb.delete_lines(db,_lines,tag_name)
|
||||
local copyfile = string.format("%s%sæIndex_table.cold.replacer",db.directory,t)
|
||||
local args = {db=db,cs=cs,oldfile=oldfile,copyfile=copyfile,do_not_skip_removed_items=true}
|
||||
db.indexes_pool[cs] = f
|
||||
colddb.iterate_index_table(db,delete_lines_func_begin,delete_lines_func_i,delete_lines_func_end,150,args,tag_name)
|
||||
colddb.iterate_index_table(db,delete_lines_func_begin,delete_lines_func_i,delete_lines_func_end,args,tag_name)
|
||||
end
|
||||
end
|
||||
|
||||
@ -374,21 +376,15 @@ function colddb.get_count(db,tag_name)
|
||||
return nil
|
||||
end
|
||||
|
||||
local function iterate(db,func_on_iterate,end_func,cycles_per_tick,count,cs,args)
|
||||
local function iterate(db,func_on_iterate,end_func,count,cs,args)
|
||||
local f = db.indexes_pool[cs]
|
||||
local fl = f.file
|
||||
for i=1,cycles_per_tick do
|
||||
if count < 1 then
|
||||
break
|
||||
end
|
||||
extended_api.Async.iterate(db.async_pool,1,count,function(i)
|
||||
local line = fl:read("*l")
|
||||
if args.do_not_skip_removed_items or not db.indexes_pool[cs].deleted_items[line] then
|
||||
func_on_iterate(line,i,args)
|
||||
end
|
||||
count = count - 1
|
||||
end
|
||||
db.indexes_pool[cs] = f
|
||||
if count < 1 then
|
||||
end,function()
|
||||
if end_func then
|
||||
end_func(args)
|
||||
end
|
||||
@ -405,19 +401,18 @@ local function iterate(db,func_on_iterate,end_func,cycles_per_tick,count,cs,args
|
||||
copy.args = a
|
||||
end
|
||||
end
|
||||
minetest.after(0,iterate,copy.db,copy.func_on_iterate,copy.end_func,copy.cycles_per_tick,copy.count,copy.cs,copy.args)
|
||||
minetest.after(0,iterate,copy.db,copy.func_on_iterate,copy.end_func,copy.count,copy.cs,copy.args)
|
||||
table.remove(db.iterate_queue[cs],1)
|
||||
return true
|
||||
return false
|
||||
else
|
||||
db.iterate_queue[cs] = nil
|
||||
end
|
||||
db.indexes_pool[cs].iterating = false
|
||||
return true
|
||||
end
|
||||
minetest.after(0,iterate,db,func_on_iterate,end_func,cycles_per_tick,count,cs,args)
|
||||
db.indexes_pool[cs].iterating = false
|
||||
return false
|
||||
end)
|
||||
end
|
||||
|
||||
function colddb.iterate_index_table(db,begin_func,func_on_iterate,end_func,cycles_per_tick,args,tag_name)
|
||||
function colddb.iterate_index_table(db,begin_func,func_on_iterate,end_func,args,tag_name)
|
||||
local t = ""
|
||||
local name = "æIndex_table"
|
||||
if tag_name then
|
||||
@ -455,7 +450,7 @@ function colddb.iterate_index_table(db,begin_func,func_on_iterate,end_func,cycle
|
||||
end
|
||||
end
|
||||
-- Start iterating the index table
|
||||
iterate(db,func_on_iterate,end_func,cycles_per_tick,c,cs,args)
|
||||
iterate(db,func_on_iterate,end_func,c,cs,args)
|
||||
elseif f and f.file then
|
||||
-- If its iterating some other function then add this one to the queue list
|
||||
fl:seek("set")
|
||||
@ -467,7 +462,7 @@ function colddb.iterate_index_table(db,begin_func,func_on_iterate,end_func,cycle
|
||||
if not db.iterate_queue[cs] then
|
||||
db.iterate_queue[cs] = {}
|
||||
end
|
||||
local _table = {db=db,begin_func=begin_func,func_on_iterate=func_on_iterate,end_func=end_func,cycles_per_tick=cycles_per_tick,count=c,cs=cs,tag_name=tag_name,args=args}
|
||||
local _table = {db=db,begin_func=begin_func,func_on_iterate=func_on_iterate,end_func=end_func,count=c,cs=cs,tag_name=tag_name,args=args}
|
||||
table.insert(db.iterate_queue[cs],_table)
|
||||
end
|
||||
end
|
||||
@ -494,14 +489,18 @@ function colddb.set(db,name,_table,tag_name)
|
||||
t = colddb.get_tag(db,tag_name)
|
||||
end
|
||||
if db.indexes and not colddb.file_Exists(db,name,tag_name) then
|
||||
local cs2 = string.format("%s%s",t,"æIndex_table")
|
||||
local om = db.indexes_pool[cs2]
|
||||
if not colddb.file_Exists(db,"æIndex_table",tag_name) or not (om and om.file) then
|
||||
colddb.open_index_table(db,tag_name)
|
||||
end
|
||||
minetest.after(0,function(db,name,tag_name)colddb.append_index_table(db,name,tag_name)end,db,name,tag_name)
|
||||
extended_api.Async.queue_task(db.async_pool,function()
|
||||
local cs2 = string.format("%s%s",t,"æIndex_table")
|
||||
local om = db.indexes_pool[cs2]
|
||||
if not colddb.file_Exists(db,"æIndex_table",tag_name) or not (om and om.file) then
|
||||
colddb.open_index_table(db,tag_name)
|
||||
end
|
||||
colddb.append_index_table(db,name,tag_name)
|
||||
end)
|
||||
end
|
||||
minetest.after(1,function(db,name, _table,tag_name)colddb.save_table(db,name, _table,tag_name)end,db,name, _table,tag_name)
|
||||
extended_api.Async.queue_task(db.async_pool,function()
|
||||
colddb.save_table(db,name, _table,tag_name)
|
||||
end)
|
||||
if db.add_to_mem_pool then
|
||||
load_into_mem(db,name,_table,tag_name)
|
||||
end
|
||||
@ -513,55 +512,63 @@ function colddb.set_key(db,name,tag_name)
|
||||
t = colddb.get_tag(db,tag_name)
|
||||
end
|
||||
if db.indexes and not colddb.file_Exists(db,name,tag_name) then
|
||||
local cs2 = string.format("%s%s",t,"æIndex_table")
|
||||
local om = db.indexes_pool[cs2]
|
||||
if not colddb.file_Exists(db,"æIndex_table",tag_name) or not (om and om.file) then
|
||||
colddb.open_index_table(db,tag_name)
|
||||
end
|
||||
minetest.after(0,function(db,name,tag_name)colddb.append_index_table(db,name,tag_name)end,db,name,tag_name)
|
||||
extended_api.Async.queue_task(db.async_pool,function()
|
||||
local cs2 = string.format("%s%s",t,"æIndex_table")
|
||||
local om = db.indexes_pool[cs2]
|
||||
if not colddb.file_Exists(db,"æIndex_table",tag_name) or not (om and om.file) then
|
||||
colddb.open_index_table(db,tag_name)
|
||||
end
|
||||
colddb.append_index_table(db,name,tag_name)
|
||||
end)
|
||||
end
|
||||
minetest.after(1,function(db,name, tag_name)colddb.save_key(db,name, tag_name)end,db,name, tag_name)
|
||||
extended_api.Async.queue_task(db.async_pool,function()
|
||||
colddb.save_key(db,name, tag_name)
|
||||
end)
|
||||
if db.add_to_mem_pool then
|
||||
load_into_mem(db,name,"",tag_name)
|
||||
end
|
||||
end
|
||||
|
||||
function colddb.get(db,name,tag_name)
|
||||
local t = ""
|
||||
if tag_name then
|
||||
t = colddb.get_tag(db,tag_name)
|
||||
end
|
||||
local cs = string.format("%s%s",t,name)
|
||||
local pm = db.mem_pool[cs]
|
||||
if pm then
|
||||
return pm.mem
|
||||
else
|
||||
local _table = colddb.load_table(db,name,tag_name)
|
||||
if _table then
|
||||
load_into_mem(db,name,_table,tag_name)
|
||||
return _table
|
||||
function colddb.get(db,name,tag_name,callback)
|
||||
extended_api.Async.queue_task(db.async_pool,function()
|
||||
local t = ""
|
||||
if tag_name then
|
||||
t = colddb.get_tag(db,tag_name)
|
||||
end
|
||||
end
|
||||
return nil
|
||||
local cs = string.format("%s%s",t,name)
|
||||
local pm = db.mem_pool[cs]
|
||||
if pm then
|
||||
return pm.mem
|
||||
else
|
||||
local _table = colddb.load_table(db,name,tag_name)
|
||||
if _table then
|
||||
load_into_mem(db,name,_table,tag_name)
|
||||
return _table
|
||||
end
|
||||
end
|
||||
return nil
|
||||
end,callback)
|
||||
end
|
||||
|
||||
function colddb.get_key(db,name,tag_name)
|
||||
local t = ""
|
||||
if tag_name then
|
||||
t = colddb.get_tag(db,tag_name)
|
||||
end
|
||||
local cs = string.format("%s%s",t,name)
|
||||
local pm = db.mem_pool[cs]
|
||||
if pm then
|
||||
return true
|
||||
else
|
||||
local bool = colddb.load_key(db,name,tag_name)
|
||||
if bool then
|
||||
load_into_mem(db,name,bool,tag_name)
|
||||
return bool
|
||||
function colddb.get_key(db,name,tag_name,callback)
|
||||
extended_api.Async.queue_task(db.async_pool,function()
|
||||
local t = ""
|
||||
if tag_name then
|
||||
t = colddb.get_tag(db,tag_name)
|
||||
end
|
||||
end
|
||||
return nil
|
||||
local cs = string.format("%s%s",t,name)
|
||||
local pm = db.mem_pool[cs]
|
||||
if pm then
|
||||
return true
|
||||
else
|
||||
local bool = colddb.load_key(db,name,tag_name)
|
||||
if bool then
|
||||
load_into_mem(db,name,bool,tag_name)
|
||||
return bool
|
||||
end
|
||||
end
|
||||
return nil
|
||||
end,callback)
|
||||
end
|
||||
|
||||
function colddb.remove(db,name,tag_name)
|
||||
@ -574,11 +581,15 @@ function colddb.remove(db,name,tag_name)
|
||||
db.mem_pool[cs] = nil
|
||||
end
|
||||
if db.indexes and colddb.file_Exists(db,"æIndex_table",tag_name) then
|
||||
local cs2 = string.format("%s%s",t,"æIndex_table")
|
||||
if not (db.indexes_pool[cs2] and db.indexes_pool[cs2].file) then
|
||||
colddb.open_index_table(db,tag_name)
|
||||
end
|
||||
colddb.delete_lines(db,name,tag_name)
|
||||
extended_api.Async.queue_task(db.async_pool,function()
|
||||
local cs2 = string.format("%s%s",t,"æIndex_table")
|
||||
if not (db.indexes_pool[cs2] and db.indexes_pool[cs2].file) then
|
||||
colddb.open_index_table(db,tag_name)
|
||||
end
|
||||
colddb.delete_lines(db,name,tag_name)
|
||||
end)
|
||||
end
|
||||
minetest.after(0,function(db,name,tag_name)colddb.delete_file(db,name,tag_name)end,db,name,tag_name)
|
||||
extended_api.Async.queue_task(db.async_pool,function()
|
||||
colddb.delete_file(db,name,tag_name)
|
||||
end)
|
||||
end
|
Reference in New Issue
Block a user