Skip to content
Extraits de code Groupes Projets
message_handler.rb 2,72 ko
Newer Older
  • Learn to ignore specific revisions
  • maxwell's avatar
    maxwell a validé
      
    
      NUM_TRIES = 3
      TIMEOUT = 5 #seconds
    
    maxwell's avatar
    maxwell a validé
      
    
      def initialize
        @queue = EM::Queue.new
      end
    
      def add_get_request(destinations)
        destinations.each{ |dest| @queue.push(Message.new(:get, dest))}
      end
    
      def add_subscription_request(feed_url)
        @queue.push(Message.new(:ostatus_subscribe, feed_url)) 
      end
    
      def add_post_request(destinations, body)
    
    maxwell's avatar
    maxwell a validé
        b = CGI::escape( body )
    
        destinations.each{|dest| @queue.push(Message.new(:post, dest, :body => b))}
    
      # pubsubhubbub
      def add_hub_notification(hub_url, feed_url)
        @queue.push(Message.new(:hub_publish, hub_url, :body => feed_url))
    
      def add_hub_subscription_request(hub_url, feed_url)
        @queue.push(Message.new(:hub_subscribe, hub_url, :body => feed_url))
    
      end
    
    
      def process_ostatus_subscription(query_object, http)
          hub = Diaspora::OStatusParser::find_hub(http.response)
          add_hub_subscription_request(hub, query_object.destination)
          Diaspora::OStatusParser::parse_sender(http.response)
    
      def process
        @queue.pop{ |query|
          case query.type
          when :post
    
            http = EventMachine::HttpRequest.new(query.destination).post :timeout => TIMEOUT, :body =>{:xml => query.body}
    
            http.callback { puts query.destination; process; process}
    
          when :get
            http = EventMachine::HttpRequest.new(query.destination).get :timeout => TIMEOUT
            http.callback {send_to_seed(query, http.response); process}
    
          when :ostatus_subscribe
    
    maxwell's avatar
    maxwell a validé
            puts query.destination
    
            http = EventMachine::HttpRequest.new(query.destination).get :timeout => TIMEOUT
    
            http.callback { process_ostatus_subscription(query, http); process}
    
          when :hub_publish
    
            http = EventMachine::PubSubHubbub.new(query.destination).publish query.body, :timeout => TIMEOUT 
    
    maxwell's avatar
    maxwell a validé
            http.callback { process}
    
          when :hub_subscribe
    
            http = EventMachine::PubSubHubbub.new(query.destination).subscribe query.body, User.owner.url + 'hubbub',  :timeout => TIMEOUT 
    
          else
            raise "message is not a type I know!"
          end
    
            puts http.response
    
    maxwell's avatar
    maxwell a validé
            puts "failure from #{query.destination}, retrying"
    
            query.try_count +=1
            @queue.push query unless query.try_count >= NUM_TRIES 
            process
          }
        } unless @queue.size == 0
      end
    
      def send_to_seed(message, http_response)
        #DO SOMETHING!
      end
    
        attr_accessor :type, :destination, :body, :callback, :try_count
    
          @type = type
          @destination = dest
    
          @callback = opts[:callback] ||= lambda{ process; process }