I've been playing around with Ruby lately, trying to write a UDP server. Of course, the way to write servers in Ruby is apparently (I'm still a bit new at this Ruby malarky) to subclass GServer.
And what a fine piece of code GServer is! Really does the job and shows just how easy it is to do stuff in Ruby. If you want to a TCP server.
But I needed a UDP server. So after much messing about I came up with the following half-baked solution. Please feel free to tear to pieces. I should remind you however that this code is WOMM certified.
# with apologies to John W. Small require "socket" require "thread" class UServer DEFAULT_HOST = "127.0.0.1" DEFAULT_TRANSPORT = "TCP"; def serve(io,content) end @@services = {} # Hash of opened ports, i.e. services @@servicesMutex = Mutex.new def UServer.stop(port, host = DEFAULT_HOST, trans = DEFAULT_TRANSPORT) @@servicesMutex.synchronize { @@services[host][port][trans].stop } end def UServer.in_service?(port, host = DEFAULT_HOST, trans = DEFAULT_TRANSPORT) @@services.has_key?(host) and @@services[host].has_key?(port) and @@services[host][port].has_key?(trans) end def stop @connectionsMutex.synchronize { if @serverThread @serverThread.raise "stop" end } end def stopped? nil == @serverThread end def shutdown @shutdown = true end def connections @connections.size end def join @serverThread.join if @serverThread end attr_reader :port, :host, :trans, :maxConnections attr_accessor :stdlog, :audit, :debug def connecting(client) addr = client.peeraddr log("#{self.class.to_s} #{@host}:#{@port} #{@trans} client:#{addr[1]} " + "#{addr[2]}<#{addr[3]}> connect") true end def disconnecting(clientPort) log("#{self.class.to_s} #{@host}:#{@port} #{@trans}" + "client:#{clientPort} disconnect") end protected :connecting, :disconnecting def starting() log("#{self.class.to_s} #{@host}:#{@port} #{@trans} start") end def stopping() log("#{self.class.to_s} #{@host}:#{@port} #{@trans} stop") end protected :starting, :stopping def error(detail) log(detail.backtrace.join("n")) end def log(msg) if @stdlog @stdlog.puts("[#{Time.new.ctime}] %s" % msg) @stdlog.flush end end protected :error, :log def initialize(port, host = DEFAULT_HOST, trans = DEFAULT_TRANSPORT, maxConnections = 4, stdlog = $stderr, audit = false, debug = false) @serverThread = nil @port = port @host = host @trans = trans @maxConnections = maxConnections @connections = [] @connectionsMutex = Mutex.new @connectionsCV = ConditionVariable.new @stdlog = stdlog @audit = audit @debug = debug end def start(maxConnections = -1) raise "running" if !stopped? @shutdown = false @maxConnections = maxConnections if maxConnections > 0 @@servicesMutex.synchronize { if UServer.in_service?(@port,@host,@trans) raise "Port already in use: #{host}:#{@port} #{@trans}!" end if "TCP" == @trans @server = ContentTCPServer.new(@host,@port) else @server = ContentUDPServer.new(@host,@port) end @@services[@host] = {} unless @@services.has_key?(@host) @@services[@host][@port] = {} unless @@services[@host].has_key?(@port) @@services[@host][@port][@trans] = self; } @serverThread = Thread.new { begin starting if @audit while !@shutdown @connectionsMutex.synchronize { puts "start @con.size=#{@connections.size}" while @connections.size >= @maxConnections @connectionsCV.wait(@connectionsMutex) end } client, port, close, content = @server.accept Thread.new(client,port,close,content) { |myClient, myPort, myClose, myContent| @connectionsMutex.synchronize { @connections << Thread.current } begin serve(myClient,myContent) if !@audit or connecting(myClient) puts "finished serve" rescue => detail error(detail) if @debug ensure begin if myClose myClient.close end rescue end @connectionsMutex.synchronize { @connections.delete(Thread.current) @connectionsCV.signal } disconnecting(myPort) if @audit end } end rescue => detail error(detail) if @debug ensure begin @server.close rescue end if @shutdown @connectionsMutex.synchronize { while @connections.size > 0 @connectionsCV.wait(@connectionsMutex) end } else @connections.each { |c| c.raise "stop" } end @serverThread = nil @@servicesMutex.synchronize { @@services[@host][@port].delete(@trans) } stopping if @audit end } self end end class ContentTCPServer def initialize( host, port ) @server = TCPServer.new(host,port) end def accept client = @server.accept return [client,client.peeraddr[1],true,client.gets(nil)] end def close @server.close end end class ContentUDPServer def initialize( host, port ) puts "init" @socket = UDPSocket.new puts "new s: #{@socket} on #{host}:#{port}" @socket.bind(host, port) puts "bound: #{@socket}" end def accept puts "accept" packet = @socket.recvfrom(1024) return [@socket, 0, false, packet[0]] end def close @socket.close end end
Lovely! Who says Java can't be translated into Ruby? I even used duck typing!
Anyway, see if you can spot the significant difference in thread handling between this and the original GServer.
Well, OK, here it is:
GServer says:
@connections << Thread.new(client) { |myClient| ... }
I say:
Thread.new(client,port,close,content) { |myClient, myPort, myClose, myContent| @connectionsMutex.synchronize { @connections << Thread.current } ... }
What happened was that UDP packets were handled so quickly that the thread was (mostly) never placed in the @connections
list until after it was (supposedly) removed from the list by
@connectionsMutex.synchronize { @connections.delete(Thread.current) @connectionsCV.signal }
at the end of the request thread. Seems like a nasty race condition to me. My code just makes sure that the thread is placed into the @connections
list before nasty stuff can happen.
Am I right? You tell me…