@@ 152,6 152,28 @@ const redisConfigFromEnv = (env) => {
};
};
+const PUBLIC_CHANNELS = [
+ 'public',
+ 'public:media',
+ 'public:local',
+ 'public:local:media',
+ 'public:remote',
+ 'public:remote:media',
+ 'hashtag',
+ 'hashtag:local',
+];
+
+// Used for priming the counters/gauges for the various metrics that are
+// per-channel
+const CHANNEL_NAMES = [
+ 'system',
+ 'user',
+ 'user:notification',
+ 'list',
+ 'direct',
+ ...PUBLIC_CHANNELS
+];
+
const startServer = async () => {
const app = express();
@@ 203,9 225,6 @@ const startServer = async () => {
labelNames: ['type'],
});
- connectedClients.set({ type: 'websocket' }, 0);
- connectedClients.set({ type: 'eventsource' }, 0);
-
const connectedChannels = new metrics.Gauge({
name: 'connected_channels',
help: 'The number of channels the streaming server is streaming to',
@@ 217,6 236,35 @@ const startServer = async () => {
help: 'The number of Redis channels the streaming server is subscribed to',
});
+ const redisMessagesReceived = new metrics.Counter({
+ name: 'redis_messages_received_total',
+ help: 'The total number of messages the streaming server has received from redis subscriptions'
+ });
+
+ const messagesSent = new metrics.Counter({
+ name: 'messages_sent_total',
+ help: 'The total number of messages the streaming server sent to clients per connection type',
+ labelNames: [ 'type' ]
+ });
+
+ // Prime the gauges so we don't loose metrics between restarts:
+ redisSubscriptions.set(0);
+ connectedClients.set({ type: 'websocket' }, 0);
+ connectedClients.set({ type: 'eventsource' }, 0);
+
+ // For each channel, initialize the gauges at zero; There's only a finite set of channels available
+ CHANNEL_NAMES.forEach(( channel ) => {
+ connectedChannels.set({ type: 'websocket', channel }, 0);
+ connectedChannels.set({ type: 'eventsource', channel }, 0);
+ })
+
+ // Prime the counters so that we don't loose metrics between restarts.
+ // Unfortunately counters don't support the set() API, so instead I'm using
+ // inc(0) to achieve the same result.
+ redisMessagesReceived.inc(0);
+ messagesSent.inc({ type: 'websocket' }, 0);
+ messagesSent.inc({ type: 'eventsource' }, 0);
+
// When checking metrics in the browser, the favicon is requested this
// prevents the request from falling through to the API Router, which would
// error for this endpoint:
@@ 262,6 310,8 @@ const startServer = async () => {
* @param {string} message
*/
const onRedisMessage = (channel, message) => {
+ redisMessagesReceived.inc();
+
const callbacks = subs[channel];
log.silly(`New message on channel ${redisPrefix}${channel}`);
@@ 490,17 540,6 @@ const startServer = async () => {
}
};
- const PUBLIC_CHANNELS = [
- 'public',
- 'public:media',
- 'public:local',
- 'public:local:media',
- 'public:remote',
- 'public:remote:media',
- 'hashtag',
- 'hashtag:local',
- ];
-
/**
* @param {any} req
* @param {string|undefined} channelName
@@ 705,10 744,11 @@ const startServer = async () => {
* @param {any} req
* @param {function(string, string): void} output
* @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
+ * @param {'websocket' | 'eventsource'} destinationType
* @param {boolean=} needsFiltering
* @returns {SubscriptionListener}
*/
- const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
+ const streamFrom = (ids, req, output, attachCloseHandler, destinationType, needsFiltering = false) => {
const accountId = req.accountId || req.remoteAddress;
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
@@ 717,6 757,8 @@ const startServer = async () => {
// TODO: Replace "string"-based delete payloads with object payloads:
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
+ messagesSent.labels({ type: destinationType }).inc(1);
+
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
output(event, encodedPayload);
};
@@ 1031,7 1073,7 @@ const startServer = async () => {
const onSend = streamToHttp(req, res);
const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
- streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
+ streamFrom(channelIds, req, onSend, onEnd, 'eventsource', options.needsFiltering);
}).catch(err => {
log.verbose(req.requestId, 'Subscription error:', err.toString());
httpNotFound(res);
@@ 1241,7 1283,7 @@ const startServer = async () => {
const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
const stopHeartbeat = subscriptionHeartbeat(channelIds);
- const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
+ const listener = streamFrom(channelIds, request, onSend, undefined, 'websocket', options.needsFiltering);
connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();