Non-blocking communication between Ruby processes

Hi, I run Unicorn which is a Rack http server using N forked worker
processes.
I need the following:

  • When a worker processes a HTTP request it must notify some data to
    other
    independent Ruby process XXX (different than Unicorn).

  • This communication must be non-blocking, this is, the Unicorn worker
    process
    sends the notification and doesn’t wait for response from the process
    XXX, so
    the Unicorn worker can, at the moment, generate the HTTP response and
    send
    back to the client, getting free to handle new HTTP requests.

  • The ruby process XXX should use some kind of queue system to store
    notifications and handle them. In fact, it should take them periodically
    and
    send via TCP (but not HTTP) to other server.

Which is the best approach to design such communication? perhaps using
something as EventMachine for the XXX process and Unix/TCP socket
communication between Unicorn processes and XXX process? any other
alternative
or suggestion?

Thanks a lot.

On 01/07/2010 02:18 PM, Iñaki Baz C. wrote:

Thanks a lot.
I would probably first try a simple setup: make process XXX publish a
Queue via DRb on a well known port and have one or more threads fetching
from the queue and processing data. If you fear resource exhaustion,
you can make the queue size limited. E.g.:

x.rb server
c.rb client

robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19

require ‘thread’
require ‘drb’

QUEUE_SIZE = 1024
THREAD_COUNT = 5
URI=“druby://localhost:8787”

QUEUE = SizedQueue.new QUEUE_SIZE

threads = (1…THREAD_COUNT).map do
Thread.new do
while msg = QUEUE.deq
p msg
end
end
end

DRb.start_service(URI, QUEUE)
DRb.thread.join

robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19

require ‘drb/drb’
require ‘benchmark’

SERVER_URI=“druby://localhost:8787”

QUEUE = DRbObject.new_with_uri(SERVER_URI)

10.times do |i|
puts Benchmark.times do
QUEUE.enq(sprintf(“msg %4d at %-20s”, i, Time.now))
end
end
robert@fussel:~$

Of course you can as well use a named pipe for the communication. But
then demarcation of message boundaries might be more difficult etc.

Kind regards

robert

El Jueves, 7 de Enero de 2010, Robert K.
escribió:> > response and send back to the client, getting free to handle new HTTP

alternative or suggestion?

DRb.start_service(URI, QUEUE)
QUEUE = DRbObject.new_with_uri(SERVER_URI)

10.times do |i|
puts Benchmark.times do
QUEUE.enq(sprintf(“msg %4d at %-20s”, i, Time.now))
end
end
robert@fussel:~$

Of course you can as well use a named pipe for the communication. But
then demarcation of message boundaries might be more difficult etc.

Really thanks a lot.
just a question: is it DRb good enough for performance?

On 01/07/2010 03:07 PM, Iñaki Baz C. wrote:

response and send back to the client, getting free to handle new HTTP
alternative or suggestion?

QUEUE = SizedQueue.new QUEUE_SIZE
DRb.thread.join

Really thanks a lot.
just a question: is it DRb good enough for performance?

I don’t know about your requirements. Just try it out - you can start
multiple clients and vary the number of threads and the queue size in
the server at will. To me it seemed pretty fast. I did

$ for i in 1 2 3 4 5 6 7 8 9 10; do ./c.rb & done

and message came really fast. Also note that each client prints timing
so you can see how fast it is on your machine.

If you need more performance then I’m sure you’ll find a Ruby binding to
any of the queuing framework like GNU Queue, NQS and whatnot. But I’d
start with the simple DRb based solution. It’s easily done, you have
everything you need and do not need to install extra software, not even
gems.

I just notice, there was a bug in my code: I used Benchmark.times which
prints timings of the current process. What I meant was
Benchmark.measure. I have changed the code a bit so you can easy
experiment with queue ssizes, thread counts and message counts (see
below).

With this command line

t=10;for i in seq 1 $t; do ./c.rb 10000 >“cl-$i”& done; for i in seq 1 $t; do wait; done; cat cl-*

I get pretty good timings of 7.6ms / msg with unlimited Queue size and
default thread count (5) for this unrealistic test that the queue is
hammered.

Kind regards

robert

Modified code:

robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19

require ‘thread’
require ‘drb’

THREAD_COUNT = (ARGV.shift || 5).to_i
QUEUE_SIZE = ARGV.shift

printf “%4d threads, queue size=%p\n”, THREAD_COUNT, QUEUE_SIZE

URI=“druby://localhost:8787”

Thread.abort_on_exception = true

QUEUE = QUEUE_SIZE ? SizedQueue.new(QUEUE_SIZE.to_i) : Queue.new

QUEUE.extend DRb::DRbUndumped

threads = (1…THREAD_COUNT).map do |i|
Thread.new i do |id|
while msg = QUEUE.deq
printf “thread %2d: %p\n”, id, msg
end
end
end

DRb.start_service(URI, QUEUE)
puts ‘Started’
DRb.thread.join
puts ‘Returned’
threads.each {|th| th.join rescue nil}
puts ‘Done’

robert@fussel:~$

robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19

require ‘drb/drb’
require ‘benchmark’

SERVER_URI=“druby://localhost:8787”

rep = (ARGV.shift || 20).to_i

QUEUE = DRb::DRbObject.new_with_uri(SERVER_URI)

QUEUE.enq “Started client”

Benchmark.bm 20 do |b|
b.report “client %4d” % $$ do
rep.times do |i|
QUEUE.enq(sprintf(“client %4d msg %4d at %-20s”, $$, i,
Time.now))
end
end
end

QUEUE.enq “Stopped client”

robert@fussel:~$

El Jueves, 7 de Enero de 2010, Robert K.
escribió:> any of the queuing framework like GNU Queue, NQS and whatnot. But I’d

start with the simple DRb based solution. It’s easily done, you have
everything you need and do not need to install extra software, not even
gems.

Thanks a lot. I’ve tryed a code similar to this one:
The Messaging Interface

It uses a pipe file (of course there is no queue at all).

Well, sending 100000 strings (with a loop) it takes 2-3 seconds to
receive and
print all the received data.
however using the DRb solution it just didn’t finish (I had to interrupt
the
process after 30 seconds due to CPU usage).

I’d like a simple solution. Using DRb could be nice. However using a
pipe file
seems simpler and faster. The doubt I have now is about how secure is a
pipe.
Could it leak memory if some process die or the reader process is not so
fast
to handle the received data?

I get pretty good timings of 7.6ms / msg with unlimited Queue size and
default thread count (5) for this unrealistic test that the queue is
hammered.

Really thanks a lot, I’ll try it.

On 07.01.2010 18:58, Iñaki Baz C. wrote:

Hummm, I have a reader process and a writer process.
The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in the
pipe and the data is stored (in the filesystem?).

So there is the leaking problem… I must investigate it a bit more…

pipe.write unless pipe.full?

i.e. check if your pipe hits a set limit on disk, and generate an
exception if the pipe_file reaches (or is close to reaching) the limit.

You could then buffer the data to be written until an additional (or
new) reading thread has started.

El Jueves, 7 de Enero de 2010, Iñaki Baz C.
escribió:> The doubt I have now is about how secure is a pipe.

Could it leak memory if some process die or the reader process is not so
fast to handle the received data?

Hummm, I have a reader process and a writer process.
The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in
the
pipe and the data is stored (in the filesystem?).

So there is the leaking problem… I must investigate it a bit more…

Thanks a lot.

On 01/07/2010 06:58 PM, Iñaki Baz C. wrote:

El Jueves, 7 de Enero de 2010, Iñaki Baz C. escribió:

The doubt I have now is about how secure is a pipe.
Could it leak memory if some process die or the reader process is not so
fast to handle the received data?

Hummm, I have a reader process and a writer process.

I thought you have multiple writers. Didn’t you mention multiple forked
Rack handlers?

The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in the
pipe and the data is stored (in the filesystem?).

So there is the leaking problem…

Not exactly: the writer is blocked. You can try this out:

robert@fussel:~$ mkfifo ff
robert@fussel:~$ ls -lF ff
prw-r–r-- 1 robert robert 0 2010-01-07 19:25 ff|
robert@fussel:~$ ruby19 -e ‘puts("+"*10_000)’ > ff
^Z
[1]+ Stopped ruby19 -e ‘puts("+"*10_000)’ > ff
robert@fussel:~$ wc ff &
[2] 14036
robert@fussel:~$ %1
ruby19 -e ‘puts("+"*10_000)’ > ff
robert@fussel:~$ 1 1 10001 ff

[2]+ Done wc ff
robert@fussel:~$ jobs
robert@fussel:~$

At the point where I pressed Ctrl-Z the writer hung because the pipe was
full. (The size of a pipe is usually the memory page size of the OS
IIRC, this would be 4k in case of Linux 32 bit).

I must investigate it a bit more…

I’d personally prefer to use the DRb approach because then you can
actually send typed messages, i.e. whatever information you need. Also,
it was fun to play around with those small test programs. :wink: And you
can have the reader run on any machine in the network.

Whatever you do, you have to decide how to go about the situation when
the reader goes away - for whatever reasons. You could write your
messages to a file and use an approach like “tail -f” uses to read them.
But this has the nasty effect of clobbering the file system plus if
the reader goes away the file might grow arbitrary large. And you have
locking issues. Using any in memory pipe (e.g. mkfifo or via DRb) is
preferrable IMHO. The you can still decide in the client what you do if
you cannot get rid of the message.

Thanks a lot.

You’re welcome.

Kind regards

robert

El Jueves, 7 de Enero de 2010, Iñaki Baz C.
escribió:> El Jueves, 7 de Enero de 2010, Iñaki Baz C. escribió:

The doubt I have now is about how secure is a pipe.
Could it leak memory if some process die or the reader process is not so
fast to handle the received data?

Hummm, I have a reader process and a writer process.
The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in
the pipe and the data is stored (in the filesystem?).

So there is the leaking problem… I must investigate it a bit more…

Ok, the fifo remains working at SO level so it can receive messages
after some
SO buffer capability is filled. Then the writer process blocks when
trying to
“flush” the data.
Fortunatelly it just blocks as Ruby thread level so other thread can
work.

El Jueves, 7 de Enero de 2010, Robert K.
escribió:> On 01/07/2010 06:58 PM, Iñaki Baz C. wrote:

El Jueves, 7 de Enero de 2010, Iñaki Baz C. escribió:

The doubt I have now is about how secure is a pipe.
Could it leak memory if some process die or the reader process is not so
fast to handle the received data?

Hummm, I have a reader process and a writer process.

I thought you have multiple writers. Didn’t you mention multiple forked
Rack handlers?

Yes, that’s true. Sure I’ll get into problems when writting in the FIFO
from
varios clients at the same time :slight_smile:
But for that I could generate so many fifo’s as Rack workers…

prw-r–r-- 1 robert robert 0 2010-01-07 19:25 ff|
robert@fussel:~$ jobs
it was fun to play around with those small test programs. :wink: And you
can have the reader run on any machine in the network.

Whatever you do, you have to decide how to go about the situation when
the reader goes away - for whatever reasons.

It’s realtime info so if the reader dies then it’s not so important to
recover
that information when starting again. Well, it would be nice to recover
it
just for 5-10 minutes, but no more.

You could write your
messages to a file and use an approach like “tail -f” uses to read them.
But this has the nasty effect of clobbering the file system plus if
the reader goes away the file might grow arbitrary large. And you have
locking issues. Using any in memory pipe (e.g. mkfifo or via DRb) is
preferrable IMHO. The you can still decide in the client what you do if
you cannot get rid of the message.

Yes, I must think a bit aobut it :slight_smile:

Thanks a lot for your help.

El Jueves, 7 de Enero de 2010, Phillip G.
escribió:> On 07.01.2010 18:58, Iñaki Baz C. wrote:

Hummm, I have a reader process and a writer process.
The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in
the pipe and the data is stored (in the filesystem?).

So there is the leaking problem… I must investigate it a bit more…

pipe.write unless pipe.full?

Unfortunatelly #full? is not a method of File :frowning:
Note that I’m using a fifo file (created with “mkfifo file”) so it is
not
“stored” in the filesystem. Instead it’s just a communication between
two
processes at SO level via SO’s buffers.

On 01/07/2010 08:01 PM, Phillip G. wrote:

processes at SO level via SO’s buffers.

Yeah, I gathered that from your other posts. The general point, though,
still applies: check the pipe’s size, and if it grows too large, spin
off a new reading thread.

That’s something different than you proposed initially, isn’t it? This
approach (increasing the number of readers if the pipe fills too fast)
is better because it regulates read performance according to load.

pipe.write unless pipe.full?

i.e. check if your pipe hits a set limit on disk, and generate an
exception if the pipe_file reaches (or is close to reaching) the limit.

You could then buffer the data to be written until an additional (or
new) reading thread has started.

IMHO this approach (local buffering if the pipe cannot be written to) is
not really helping because the pipe is a buffer already. In other
words, the same effect will happen - only later. The only argument in
favor of additional buffering I can see is less lock contention: if
every writer process has multiple threads that want to write to the
buffer, they could instead write to a Queue internally and a single
reader could read from that local queue and write to the global queue.
That would reduce the number of writers that compete for locks on the
global queue. Whether that is performant or not would need to be
tested.

Nevertheless I would start with a simple solution, monitor its
performance and change the implementation if it does not scale well
enough. Often simple solutions work surprisingly well… :slight_smile:

Kind regards

robert

On 07.01.2010 19:50, Iñaki Baz C. wrote:

pipe.write unless pipe.full?

Unfortunatelly #full? is not a method of File :frowning:

Well, yes, you’d have to implement the method (or something like it)
yourself. :wink:

Note that I’m using a fifo file (created with “mkfifo file”) so it is not
“stored” in the filesystem. Instead it’s just a communication between two
processes at SO level via SO’s buffers.

Yeah, I gathered that from your other posts. The general point, though,
still applies: check the pipe’s size, and if it grows too large, spin
off a new reading thread.

El Jueves, 7 de Enero de 2010, Robert K.
escribió:> > Yeah, I gathered that from your other posts. The general point, though,

still applies: check the pipe’s size, and if it grows too large, spin
off a new reading thread.

That’s something different than you proposed initially, isn’t it? This
approach (increasing the number of readers if the pipe fills too fast)
is better because it regulates read performance according to load.

Definitively I have no idea of how to know the status of a FIFO (not a
IO pipe
but a FIFO file). The only it occurs when it’s full (because no reader
is
getting the data) is that the writer #flush operation gets blocked.
I’ve found no way to determine how “full” is a FIFO file.

Iñaki Baz C. [email protected] wrote:

Hi, I run Unicorn which is a Rack http server using N forked worker processes.
I need the following:

  • When a worker processes a HTTP request it must notify some data to other
    independent Ruby process XXX (different than Unicorn).

  • This communication must be non-blocking, this is, the Unicorn worker process
    sends the notification and doesn’t wait for response from the process XXX, so
    the Unicorn worker can, at the moment, generate the HTTP response and send
    back to the client, getting free to handle new HTTP requests.

If stressed enough, everything has to block/reject or run your systems
out of memory/disk space :slight_smile:

  • The ruby process XXX should use some kind of queue system to store
    notifications and handle them. In fact, it should take them periodically and
    send via TCP (but not HTTP) to other server.

Which is the best approach to design such communication? perhaps using
something as EventMachine for the XXX process and Unix/TCP socket
communication between Unicorn processes and XXX process? any other alternative
or suggestion?

If you only talk between processes on one machine (since you’re trying
FIFOs), you can also check out the “posix_mq” gem/library I started
recently:

posix_mq - POSIX message queues for Ruby

It’s less portable than FIFOs but if you’re running a modern GNU/Linux
or
FreeBSD, it should work. The default queue sizes on Linux are small:
8192 bytes per message, and 10 messages in the queue. You’ll need
root to increase them.

But then FIFOs are hard-coded to 65536 bytes total under Linux and a
4096 byte PIPE_BUF (POSIX only requires a 512 byte PIPE_BUF).

El Jueves, 7 de Enero de 2010, Eric W.
escribió:> If you only talk between processes on one machine (since you’re trying

FIFOs), you can also check out the “posix_mq” gem/library I started
recently:

    http://bogomips.org/ruby_posix_mq/

Really interesting. Is it safe to have various processes (Unicorn
workers)
writting to a single posix_mq? or will the data be “mixed”? is there any
way
to perform “atomic” writting operation in this queue?

Thanks.

On 07.01.2010 20:50, Robert K. wrote:

On 01/07/2010 08:01 PM, Phillip G. wrote:

That’s something different than you proposed initially, isn’t it? This
approach (increasing the number of readers if the pipe fills too fast)
is better because it regulates read performance according to load.

A little refined (in that I skipped the buffering), but it’s still the
same core: check the pipe, and sin off new threads as needed.

IMHO this approach (local buffering if the pipe cannot be written to) is
not really helping because the pipe is a buffer already. In other
words, the same effect will happen - only later. The only argument in
favor of additional buffering I can see is less lock contention: if
every writer process has multiple threads that want to write to the
buffer, they could instead write to a Queue internally and a single
reader could read from that local queue and write to the global queue.
That would reduce the number of writers that compete for locks on the
global queue. Whether that is performant or not would need to be tested.

This might be a difference in interpretation: I see the pipe in this
instance as a simple inter-process communication solution, not per se a
buffer.

Otherwise: You are right.

Also in that performance would’ve to be tested, and the constraints have
to be known (Iñaki already mentioned that getting all data is less
important to him, so buffering wouldn’t be strictly necessary, either).

Nevertheless I would start with a simple solution, monitor its
performance and change the implementation if it does not scale well
enough. Often simple solutions work surprisingly well… :slight_smile:

Indeed. And it’s easier to iterate from something simple, than to
iterate from something complex, too. :wink:

Iñaki Baz C. [email protected] wrote:

getting the data) is that the writer #flush operation gets blocked.
I’ve found no way to determine how “full” is a FIFO file.

FIFO are pipes, they just have a name on the filesystem.

In any case, use IO#write_nonblock. Any writes you do will raise
Errno::EAGAIN if your FIFO/pipe is full.

See the pipe(7) manpage on a Linux machine, it provides a great overview
of pipe semantics for blocking/non-blocking operations.

Iñaki Baz C. [email protected] wrote:

El Jueves, 7 de Enero de 2010, Eric W. escribió:

If you only talk between processes on one machine (since you’re trying
FIFOs), you can also check out the “posix_mq” gem/library I started
recently:

    http://bogomips.org/ruby_posix_mq/

Really interesting. Is it safe to have various processes (Unicorn workers)
writting to a single posix_mq? or will the data be “mixed”? is there any way
to perform “atomic” writting operation in this queue?

These queues are completely atomic at the message level and descriptors
can be safely shared between processes/threads. SysV message queues
weren’t thread-safe, but POSIX ones are.

El Jueves, 7 de Enero de 2010, Eric W.
escribió:>

These queues are completely atomic at the message level and descriptors
can be safely shared between processes/threads. SysV message queues
weren’t thread-safe, but POSIX ones are.

Great!