Skip to content
Extraits de code Groupes Projets
Non vérifiée Valider ef057584 rédigé par Eugen Rochko's avatar Eugen Rochko Validation de GitHub
Parcourir les fichiers

Add support for managing multiple stream subscriptions in a single connection (#14524)

parent decc5b9a
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
// @ts-check
import { connectStream } from '../stream'; import { connectStream } from '../stream';
import { import {
updateTimeline, updateTimeline,
...@@ -19,24 +21,59 @@ import { getLocale } from '../locales'; ...@@ -19,24 +21,59 @@ import { getLocale } from '../locales';
const { messages } = getLocale(); const { messages } = getLocale();
export function connectTimelineStream (timelineId, path, pollingRefresh = null, accept = null) { /**
* @param {number} max
* @return {number}
*/
const randomUpTo = max =>
Math.floor(Math.random() * Math.floor(max));
return connectStream (path, pollingRefresh, (dispatch, getState) => { /**
* @param {string} timelineId
* @param {string} channelName
* @param {Object.<string, string>} params
* @param {Object} options
* @param {function(Function, Function): void} [options.fallback]
* @param {function(object): boolean} [options.accept]
* @return {function(): void}
*/
export const connectTimelineStream = (timelineId, channelName, params = {}, options = {}) =>
connectStream(channelName, params, (dispatch, getState) => {
const locale = getState().getIn(['meta', 'locale']); const locale = getState().getIn(['meta', 'locale']);
let pollingId;
/**
* @param {function(Function, Function): void} fallback
*/
const useFallback = fallback => {
fallback(dispatch, () => {
pollingId = setTimeout(() => useFallback(fallback), 20000 + randomUpTo(20000));
});
};
return { return {
onConnect() { onConnect() {
dispatch(connectTimeline(timelineId)); dispatch(connectTimeline(timelineId));
if (pollingId) {
clearTimeout(pollingId);
pollingId = null;
}
}, },
onDisconnect() { onDisconnect() {
dispatch(disconnectTimeline(timelineId)); dispatch(disconnectTimeline(timelineId));
if (options.fallback) {
pollingId = setTimeout(() => useFallback(options.fallback), randomUpTo(40000));
}
}, },
onReceive (data) { onReceive (data) {
switch(data.event) { switch(data.event) {
case 'update': case 'update':
dispatch(updateTimeline(timelineId, JSON.parse(data.payload), accept)); dispatch(updateTimeline(timelineId, JSON.parse(data.payload), options.accept));
break; break;
case 'delete': case 'delete':
dispatch(deleteFromTimelines(data.payload)); dispatch(deleteFromTimelines(data.payload));
...@@ -63,17 +100,59 @@ export function connectTimelineStream (timelineId, path, pollingRefresh = null, ...@@ -63,17 +100,59 @@ export function connectTimelineStream (timelineId, path, pollingRefresh = null,
}, },
}; };
}); });
}
/**
* @param {Function} dispatch
* @param {function(): void} done
*/
const refreshHomeTimelineAndNotification = (dispatch, done) => { const refreshHomeTimelineAndNotification = (dispatch, done) => {
dispatch(expandHomeTimeline({}, () => dispatch(expandHomeTimeline({}, () =>
dispatch(expandNotifications({}, () => dispatch(expandNotifications({}, () =>
dispatch(fetchAnnouncements(done)))))); dispatch(fetchAnnouncements(done))))));
}; };
export const connectUserStream = () => connectTimelineStream('home', 'user', refreshHomeTimelineAndNotification); /**
export const connectCommunityStream = ({ onlyMedia } = {}) => connectTimelineStream(`community${onlyMedia ? ':media' : ''}`, `public:local${onlyMedia ? ':media' : ''}`); * @return {function(): void}
export const connectPublicStream = ({ onlyMedia, onlyRemote } = {}) => connectTimelineStream(`public${onlyRemote ? ':remote' : ''}${onlyMedia ? ':media' : ''}`, `public${onlyRemote ? ':remote' : ''}${onlyMedia ? ':media' : ''}`); */
export const connectHashtagStream = (id, tag, local, accept) => connectTimelineStream(`hashtag:${id}${local ? ':local' : ''}`, `hashtag${local ? ':local' : ''}&tag=${tag}`, null, accept); export const connectUserStream = () =>
export const connectDirectStream = () => connectTimelineStream('direct', 'direct'); connectTimelineStream('home', 'user', {}, { fallback: refreshHomeTimelineAndNotification });
export const connectListStream = id => connectTimelineStream(`list:${id}`, `list&list=${id}`);
/**
* @param {Object} options
* @param {boolean} [options.onlyMedia]
* @return {function(): void}
*/
export const connectCommunityStream = ({ onlyMedia } = {}) =>
connectTimelineStream(`community${onlyMedia ? ':media' : ''}`, `public:local${onlyMedia ? ':media' : ''}`);
/**
* @param {Object} options
* @param {boolean} [options.onlyMedia]
* @param {boolean} [options.onlyRemote]
* @return {function(): void}
*/
export const connectPublicStream = ({ onlyMedia, onlyRemote } = {}) =>
connectTimelineStream(`public${onlyRemote ? ':remote' : ''}${onlyMedia ? ':media' : ''}`, `public${onlyRemote ? ':remote' : ''}${onlyMedia ? ':media' : ''}`);
/**
* @param {string} columnId
* @param {string} tagName
* @param {boolean} onlyLocal
* @param {function(object): boolean} accept
* @return {function(): void}
*/
export const connectHashtagStream = (columnId, tagName, onlyLocal, accept) =>
connectTimelineStream(`hashtag:${columnId}${onlyLocal ? ':local' : ''}`, `hashtag${onlyLocal ? ':local' : ''}`, { tag: tagName }, { accept });
/**
* @return {function(): void}
*/
export const connectDirectStream = () =>
connectTimelineStream('direct', 'direct');
/**
* @param {string} listId
* @return {function(): void}
*/
export const connectListStream = listId =>
connectTimelineStream(`list:${listId}`, 'list', { list: listId });
// @ts-check
import WebSocketClient from '@gamestdio/websocket'; import WebSocketClient from '@gamestdio/websocket';
const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max)); /**
* @type {WebSocketClient | undefined}
*/
let sharedConnection;
const knownEventTypes = [ /**
'update', * @typedef Subscription
'delete', * @property {string} channelName
'notification', * @property {Object.<string, string>} params
'conversation', * @property {function(): void} onConnect
'filters_changed', * @property {function(StreamEvent): void} onReceive
]; * @property {function(): void} onDisconnect
*/
export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) { /**
return (dispatch, getState) => { * @typedef StreamEvent
const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); * @property {string} event
const accessToken = getState().getIn(['meta', 'access_token']); * @property {object} payload
const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState); */
let polling = null; /**
* @type {Array.<Subscription>}
*/
const subscriptions = [];
const setupPolling = () => { /**
pollingRefresh(dispatch, () => { * @type {Object.<string, number>}
polling = setTimeout(() => setupPolling(), 20000 + randomIntUpTo(20000)); */
}); const subscriptionCounters = {};
};
/**
* @param {Subscription} subscription
*/
const addSubscription = subscription => {
subscriptions.push(subscription);
};
/**
* @param {Subscription} subscription
*/
const removeSubscription = subscription => {
const index = subscriptions.indexOf(subscription);
if (index !== -1) {
subscriptions.splice(index, 1);
}
};
/**
* @param {Subscription} subscription
*/
const subscribe = ({ channelName, params, onConnect }) => {
const key = channelNameWithInlineParams(channelName, params);
subscriptionCounters[key] = subscriptionCounters[key] || 0;
if (subscriptionCounters[key] === 0) {
sharedConnection.send(JSON.stringify({ type: 'subscribe', stream: channelName, ...params }));
}
subscriptionCounters[key] += 1;
onConnect();
};
/**
* @param {Subscription} subscription
*/
const unsubscribe = ({ channelName, params, onDisconnect }) => {
const key = channelNameWithInlineParams(channelName, params);
const clearPolling = () => { subscriptionCounters[key] = subscriptionCounters[key] || 1;
if (polling) {
clearTimeout(polling); if (subscriptionCounters[key] === 1 && sharedConnection.readyState === WebSocketClient.OPEN) {
polling = null; sharedConnection.send(JSON.stringify({ type: 'unsubscribe', stream: channelName, ...params }));
}
subscriptionCounters[key] -= 1;
onDisconnect();
};
const sharedCallbacks = {
connected () {
subscriptions.forEach(subscription => subscribe(subscription));
},
received (data) {
const { stream } = data;
subscriptions.filter(({ channelName, params }) => {
const streamChannelName = stream[0];
if (stream.length === 1) {
return channelName === streamChannelName;
} }
};
const subscription = getStream(streamingAPIBaseURL, accessToken, path, { const streamIdentifier = stream[1];
connected () {
if (pollingRefresh) {
clearPolling();
}
onConnect(); if (['hashtag', 'hashtag:local'].includes(channelName)) {
}, return channelName === streamChannelName && params.tag === streamIdentifier;
} else if (channelName === 'list') {
return channelName === streamChannelName && params.list === streamIdentifier;
}
disconnected () { return false;
if (pollingRefresh) { }).forEach(subscription => {
polling = setTimeout(() => setupPolling(), randomIntUpTo(40000)); subscription.onReceive(data);
} });
},
onDisconnect(); disconnected () {
subscriptions.forEach(({ onDisconnect }) => onDisconnect());
},
reconnected () {
subscriptions.forEach(subscription => subscribe(subscription));
},
};
/**
* @param {string} channelName
* @param {Object.<string, string>} params
* @return {string}
*/
const channelNameWithInlineParams = (channelName, params) => {
if (Object.keys(params).length === 0) {
return channelName;
}
return `${channelName}&${Object.keys(params).map(key => `${key}=${params[key]}`).join('&')}`;
};
/**
* @param {string} channelName
* @param {Object.<string, string>} params
* @param {function(Function, Function): { onConnect: (function(): void), onReceive: (function(StreamEvent): void), onDisconnect: (function(): void) }} callbacks
* @return {function(): void}
*/
export const connectStream = (channelName, params, callbacks) => (dispatch, getState) => {
const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']);
const accessToken = getState().getIn(['meta', 'access_token']);
const { onConnect, onReceive, onDisconnect } = callbacks(dispatch, getState);
// If we cannot use a websockets connection, we must fall back
// to using individual connections for each channel
if (!streamingAPIBaseURL.startsWith('ws')) {
const connection = createConnection(streamingAPIBaseURL, accessToken, channelNameWithInlineParams(channelName, params), {
connected () {
onConnect();
}, },
received (data) { received (data) {
onReceive(data); onReceive(data);
}, },
reconnected () { disconnected () {
if (pollingRefresh) { onDisconnect();
clearPolling(); },
pollingRefresh(dispatch);
}
reconnected () {
onConnect(); onConnect();
}, },
}); });
const disconnect = () => { return () => {
if (subscription) { connection.close();
subscription.close();
}
clearPolling();
}; };
}
const subscription = {
channelName,
params,
onConnect,
onReceive,
onDisconnect,
};
addSubscription(subscription);
// If a connection is open, we can execute the subscription right now. Otherwise,
// because we have already registered it, it will be executed on connect
if (!sharedConnection) {
sharedConnection = /** @type {WebSocketClient} */ (createConnection(streamingAPIBaseURL, accessToken, '', sharedCallbacks));
} else if (sharedConnection.readyState === WebSocketClient.OPEN) {
subscribe(subscription);
}
return disconnect; return () => {
removeSubscription(subscription);
unsubscribe(subscription);
}; };
} };
const KNOWN_EVENT_TYPES = [
'update',
'delete',
'notification',
'conversation',
'filters_changed',
'encrypted_message',
'announcement',
'announcement.delete',
'announcement.reaction',
];
/**
* @param {MessageEvent} e
* @param {function(StreamEvent): void} received
*/
const handleEventSourceMessage = (e, received) => {
received({
event: e.type,
payload: e.data,
});
};
/**
* @param {string} streamingAPIBaseURL
* @param {string} accessToken
* @param {string} channelName
* @param {{ connected: Function, received: function(StreamEvent): void, disconnected: Function, reconnected: Function }} callbacks
* @return {WebSocketClient | EventSource}
*/
const createConnection = (streamingAPIBaseURL, accessToken, channelName, { connected, received, disconnected, reconnected }) => {
const params = channelName.split('&');
export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) { channelName = params.shift();
const params = stream.split('&');
stream = params.shift();
if (streamingAPIBaseURL.startsWith('ws')) { if (streamingAPIBaseURL.startsWith('ws')) {
params.unshift(`stream=${stream}`);
const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken); const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken);
ws.onopen = connected; ws.onopen = connected;
...@@ -92,11 +241,19 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co ...@@ -92,11 +241,19 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co
return ws; return ws;
} }
stream = stream.replace(/:/g, '/'); channelName = channelName.replace(/:/g, '/');
if (channelName.endsWith(':media')) {
channelName = channelName.replace('/media', '');
params.push('only_media=true');
}
params.push(`access_token=${accessToken}`); params.push(`access_token=${accessToken}`);
const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${stream}?${params.join('&')}`);
const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${channelName}?${params.join('&')}`);
let firstConnect = true; let firstConnect = true;
es.onopen = () => { es.onopen = () => {
if (firstConnect) { if (firstConnect) {
firstConnect = false; firstConnect = false;
...@@ -105,15 +262,12 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co ...@@ -105,15 +262,12 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co
reconnected(); reconnected();
} }
}; };
for (let type of knownEventTypes) {
es.addEventListener(type, (e) => { KNOWN_EVENT_TYPES.forEach(type => {
received({ es.addEventListener(type, e => handleEventSourceMessage(/** @type {MessageEvent} */ (e), received));
event: e.type, });
payload: e.data,
}); es.onerror = /** @type {function(): void} */ (disconnected);
});
}
es.onerror = disconnected;
return es; return es;
}; };
Ce diff est replié.
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter