Skip to content
Extraits de code Groupes Projets
message_handler.rb 1,89 ko
Newer Older
  • Learn to ignore specific revisions
  • Raphael's avatar
    Raphael a validé
    #   Copyright (c) 2010, Diaspora Inc.  This file is
    
    Raphael's avatar
    Raphael a validé
    #   licensed under the Affero General Public License version 3.  See
    #   the COPYRIGHT file.
    
    class MessageHandler
    
    
      NUM_TRIES = 3
      TIMEOUT = 5 #seconds
    
      def initialize
        @queue = EM::Queue.new
      end
    
      def add_get_request(destinations)
    
        [*destinations].each{ |dest| @queue.push(Message.new(:get, dest))}
    
      def add_post_request(destinations, body)
    
    maxwell's avatar
    maxwell a validé
        b = CGI::escape( body )
    
        [*destinations].each{|dest| @queue.push(Message.new(:post, dest, :body => b))}
    
    danielvincent's avatar
    danielvincent a validé
      def add_hub_notification(hub_url, feed_url)
        @queue.push(Message.new(:hub_publish, hub_url, :body => feed_url))
      end
    
    
      def process
        @queue.pop{ |query|
          case query.type
          when :post
    
            http = EventMachine::HttpRequest.new(query.destination).post :timeout => TIMEOUT, :body =>{:xml => query.body}
    
    maxwell's avatar
    maxwell a validé
            http.callback { process; process}
    
          when :get
            http = EventMachine::HttpRequest.new(query.destination).get :timeout => TIMEOUT
    
            http.callback {process}
    
    danielvincent's avatar
    danielvincent a validé
          when :hub_publish
    
            http = EventMachine::PubSubHubbub.new(query.destination).publish query.body, :timeout => TIMEOUT
    
    danielvincent's avatar
    danielvincent a validé
            http.callback {process}
    
          else
            raise "message is not a type I know!"
          end
    
    ilya's avatar
    ilya a validé
            Rails.logger.info(http.response)
            Rails.logger.info("Failure from #{query.destination}, retrying...")
    
    
            @queue.push query unless query.try_count >= NUM_TRIES
    
            process
          }
        } unless @queue.size == 0
      end
    
        attr_accessor :type, :destination, :body, :callback, :owner_url, :try_count
    
          @callback = opts[:callback] ||= lambda{ process; process }