Newer
Older
# licensed under the Affero General Public License version 3 or later. See
Daniel Vincent Grippi
a validé
module Diaspora
module WebSocket
danielgrippi
a validé
REDIS_CONNECTION_SET = 'ws-uids'
def self.redis
@redis ||= Resque.redis
end
def self.length
redis.llen :websocket
end
def self.queue_to_user(uid, data)
redis.lpush(:websocket, {:uid => uid, :data => data}.to_json)
end
def self.initialize_channels
def self.push_to_user(uid, data)
@channels[uid][0].push(data) if @channels[uid]
def self.subscribe(uid, ws)
Rails.logger.info "event=socket-subscribe uid=#{uid} channels=#{self.length}"
self.ensure_channel(uid)
@channels[uid][0].subscribe{ |msg| ws.send msg }
@channels[uid][1] += 1
danielgrippi
a validé
redis.sadd(REDIS_CONNECTION_SET, uid)
end
def self.ensure_channel(uid)
@channels[uid] ||= [EM::Channel.new, 0 ]
end
def self.unsubscribe(uid,sid)
Rails.logger.info "event=socket-unsubscribe sid=#{sid} uid=#{uid} channels=#{self.length}"
@channels[uid][0].unsubscribe(sid) if @channels[uid]
@channels[uid][1] -= 1
if @channels[uid][1] <= 0
@channels.delete(uid)
danielgrippi
a validé
redis.srem(REDIS_CONNECTION_SET, uid)
end
end
danielgrippi
a validé
def self.is_connected?(uid)
redis.sismember(REDIS_CONNECTION_SET, uid)
end
end
module Socketable
SocketsController.new.outgoing(user, self, opts)
def unsocket_from_user(user, opts={})
SocketsController.new.outgoing(user, Retraction.for(self), opts)
end
end
end