@@ 622,29 622,39 @@ const startServer = async () => {
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
- // Currently message is of type string, soon it'll be Record<string, any>
- const listener = message => {
- const { event, payload, queued_at } = message;
-
- const transmit = (payload) => {
- const now = new Date().getTime();
- const delta = now - queued_at;
- const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
+ const transmit = (event, payload) => {
+ // TODO: Replace "string"-based delete payloads with object payloads:
+ const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
- log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
- output(event, encodedPayload);
- };
+ log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
+ output(event, encodedPayload);
+ };
- // Only messages that may require filtering are statuses, since notifications
- // are already personalized and deletes do not matter
- if (!needsFiltering || event !== 'update') {
- transmit(payload);
+ // The listener used to process each message off the redis subscription,
+ // message here is an object with an `event` and `payload` property. Some
+ // events also include a queued_at value, but this is being removed shortly.
+ const listener = message => {
+ const { event, payload } = message;
+
+ // Streaming only needs to apply filtering to some channels and only to
+ // some events. This is because majority of the filtering happens on the
+ // Ruby on Rails side when producing the event for streaming.
+ //
+ // The only events that require filtering from the streaming server are
+ // `update` and `status.update`, all other events are transmitted to the
+ // client as soon as they're received (pass-through).
+ //
+ // The channels that need filtering are determined in the function
+ // `channelNameToIds` defined below:
+ if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
+ transmit(event, payload);
return;
}
- const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
- const accountDomain = payload.account.acct.split('@')[1];
+ // The rest of the logic from here on in this function is to handle
+ // filtering of statuses:
+ // Filter based on language:
if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) {
log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`);
return;
@@ 652,11 662,16 @@ const startServer = async () => {
// When the account is not logged in, it is not necessary to confirm the block or mute
if (!req.accountId) {
- transmit(payload);
+ transmit(event, payload);
return;
}
- pgPool.connect((err, client, done) => {
+ // Filter based on domain blocks, blocks, mutes, or custom filters:
+ const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
+ const accountDomain = payload.account.acct.split('@')[1];
+
+ // TODO: Move this logic out of the message handling loop
+ pgPool.connect((err, client, releasePgConnection) => {
if (err) {
log.error(err);
return;
@@ 683,28 698,45 @@ const startServer = async () => {
}
Promise.all(queries).then(values => {
- done();
+ releasePgConnection();
+ // Handling blocks & mutes and domain blocks: If one of those applies,
+ // then we don't transmit the payload of the event to the client
if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
return;
}
- if (!payload.filtered && !req.cachedFilters) {
+ // If the payload already contains the `filtered` property, it means
+ // that filtering has been applied on the ruby on rails side, as
+ // such, we don't need to construct or apply the filters in streaming:
+ if (Object.prototype.hasOwnProperty.call(payload, "filtered")) {
+ transmit(event, payload);
+ return;
+ }
+
+ // Handling for constructing the custom filters and caching them on the request
+ // TODO: Move this logic out of the message handling lifecycle
+ if (!req.cachedFilters) {
const filterRows = values[accountDomain ? 2 : 1].rows;
- req.cachedFilters = filterRows.reduce((cache, row) => {
- if (cache[row.id]) {
- cache[row.id].keywords.push([row.keyword, row.whole_word]);
+ req.cachedFilters = filterRows.reduce((cache, filter) => {
+ if (cache[filter.id]) {
+ cache[filter.id].keywords.push([filter.keyword, filter.whole_word]);
} else {
- cache[row.id] = {
- keywords: [[row.keyword, row.whole_word]],
- expires_at: row.expires_at,
- repr: {
- id: row.id,
- title: row.title,
- context: row.context,
- expires_at: row.expires_at,
- filter_action: ['warn', 'hide'][row.filter_action],
+ cache[filter.id] = {
+ keywords: [[filter.keyword, filter.whole_word]],
+ expires_at: filter.expires_at,
+ filter: {
+ id: filter.id,
+ title: filter.title,
+ context: filter.context,
+ expires_at: filter.expires_at,
+ // filter.filter_action is the value from the
+ // custom_filters.action database column, it is an integer
+ // representing a value in an enum defined by Ruby on Rails:
+ //
+ // enum { warn: 0, hide: 1 }
+ filter_action: ['warn', 'hide'][filter.filter_action],
},
};
}
@@ 712,6 744,10 @@ const startServer = async () => {
return cache;
}, {});
+ // Construct the regular expressions for the custom filters: This
+ // needs to be done in a separate loop as the database returns one
+ // filterRow per keyword, so we need all the keywords before
+ // constructing the regular expression
Object.keys(req.cachedFilters).forEach((key) => {
req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
@@ 731,34 767,56 @@ const startServer = async () => {
});
}
- // Check filters
- if (req.cachedFilters && !payload.filtered) {
- const mutatedPayload = { ...payload };
+ // Apply cachedFilters against the payload, constructing a
+ // `filter_results` array of FilterResult entities
+ if (req.cachedFilters) {
const status = payload;
- const searchContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
- const searchIndex = JSDOM.fragment(searchContent).textContent;
+ // TODO: Calculate searchableContent in Ruby on Rails:
+ const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
+ const searchableTextContent = JSDOM.fragment(searchableContent).textContent;
const now = new Date();
- mutatedPayload.filtered = [];
- Object.values(req.cachedFilters).forEach((cachedFilter) => {
- if ((cachedFilter.expires_at === null || cachedFilter.expires_at > now)) {
- const keyword_matches = searchIndex.match(cachedFilter.regexp);
- if (keyword_matches) {
- mutatedPayload.filtered.push({
- filter: cachedFilter.repr,
- keyword_matches,
- });
- }
+ const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => {
+ // Check the filter hasn't expired before applying:
+ if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) {
+ return;
}
- });
- transmit(mutatedPayload);
+ // Just in-case JSDOM fails to find textContent in searchableContent
+ if (!searchableTextContent) {
+ return;
+ }
+
+ const keyword_matches = searchableTextContent.match(cachedFilter.regexp);
+ if (keyword_matches) {
+ // results is an Array of FilterResult; status_matches is always
+ // null as we only are only applying the keyword-based custom
+ // filters, not the status-based custom filters.
+ // https://docs.joinmastodon.org/entities/FilterResult/
+ results.push({
+ filter: cachedFilter.filter,
+ keyword_matches,
+ status_matches: null
+ });
+ }
+ }, []);
+
+ // Send the payload + the FilterResults as the `filtered` property
+ // to the streaming connection. To reach this code, the `event` must
+ // have been either `update` or `status.update`, meaning the
+ // `payload` is a Status entity, which has a `filtered` property:
+ //
+ // filtered: https://docs.joinmastodon.org/entities/Status/#filtered
+ transmit(event, {
+ ...payload,
+ filtered: filter_results
+ });
} else {
- transmit(payload);
+ transmit(event, payload);
}
}).catch(err => {
+ releasePgConnection();
log.error(err);
- done();
});
});
};