I have a simple thread pool of [default] 20 threads running. They read
a task from a queue and execute it and then go back to waiting on the
queue. Task popped from the queue contains a block and gets executed
within thread pool.
Code is here:
require “thread”
class WorkData
attr_accessor :data,:block,:job_key
def initialize(args,job_key,&block)
@data = args
@job_key = job_key
@block = block
end
end
class ThreadPool
attr_accessor :size,:threads,:work_queue
def initialize(size)
@size = size
@threads = []
@work_queue = Queue.new
puts Time.now
@size.times { add_thread }
puts “all threads started: #{Time.now}”
end
def defer(*args,&block)
job_key = Thread.current[:job_key]
@work_queue << WorkData.new(args,job_key,&block)
end
def add_thread
@threads << Thread.new do
Thread.current[:job_key] = nil
while true
task = @work_queue.pop
Thread.current[:job_key] = task.job_key
block_result = run_task(task)
end
end
end
def run_task task
block_arity = task.block.arity
begin
t_data = task.data
result = nil
if block_arity != 0
result = t_data.is_a?(Array) ? (task.block.call(*t_data)) :
(task.block.call(t_data))
else
result = task.block.call
end
return result
rescue
return nil
end
end
end
class SomeLoop
attr_accessor :thread_pool
def initialize
@thread_pool = ThreadPool.new(20)
end
def start
count = 0
counter = 0
Signal.trap(“USR1”) { request_generator }
loop do
if count < 600
make_requests()
sleep(0.05)
count += 1
end
sleep(0.005)
if (counter%100 == 0 && length != 0)
puts “Queue Length: #{length}”
puts “Count : #{count}”
elsif(length == 0 && count >= 600)
puts “All tasks has been executed”
end
counter += 1
end
end
def request_generator
puts “lets make request”
600.times {
make_requests()
sleep(0.05)
}
end
def length
thread_pool.work_queue.length
end
def some_job count
sleep 0.8
end
def make_requests
100.times { |i|
thread_pool.defer(i,&method(:some_job))
}
end
end
Thread.abort_on_exception = true
a = SomeLoop.new
a.start
Its being used in a live application and I have managed to extract the
problematic part.
Now the problem is, as tasks get added in the queue, my program starts
using more memory and thats fair. But even after all the tasks are
popped out from the queue the memory stays the same and further
addition of tasks leads to more memory usage and gets eventually
restarted by monitoring app.
You can simulate the scenario by running above program and by sending
“USR1” signal to it. Memory usage just keeps climbing.
I have tried detecting the leak with bleakhouse and can paste the dump
if useful.
I have also tried using ‘dike’, but couldn’t get any meaningful dump (
Ara, may be you can throw some pointers).
Any ideas?