~cytrogen/masto-fe

301e03eb8cd258ec38212941736eba2ecc825331 — Emelia Smith 3 years ago 36631e4
Remove clustering from streaming API (#24655)

4 files changed, 53 insertions(+), 101 deletions(-)

M dist/mastodon-streaming.service
M package.json
M streaming/index.js
M yarn.lock
M dist/mastodon-streaming.service => dist/mastodon-streaming.service +1 -1
@@ 8,10 8,10 @@ User=mastodon
WorkingDirectory=/home/mastodon/live
Environment="NODE_ENV=production"
Environment="PORT=4000"
Environment="STREAMING_CLUSTER_NUM=1"
ExecStart=/usr/bin/node ./streaming
TimeoutSec=15
Restart=always
LimitNOFILE=65536
# Proc filesystem
ProcSubset=pid
ProtectProc=invisible

M package.json => package.json +0 -2
@@ 122,7 122,6 @@
    "substring-trie": "^1.0.2",
    "terser-webpack-plugin": "^4.2.3",
    "tesseract.js": "^2.1.1",
    "throng": "^4.0.0",
    "tiny-queue": "^0.2.1",
    "twitter-text": "3.1.0",
    "uuid": "^8.3.1",


@@ 175,7 174,6 @@
    "@types/react-toggle": "^4.0.3",
    "@types/redux-immutable": "^4.0.3",
    "@types/requestidlecallback": "^0.3.5",
    "@types/throng": "^4.0.2",
    "@types/uuid": "^8.3.4",
    "@types/webpack": "^4.41.33",
    "@types/yargs": "^17.0.22",

M streaming/index.js => streaming/index.js +52 -81
@@ 1,7 1,5 @@
// @ts-check

const os = require('os');
const throng = require('throng');
const dotenv = require('dotenv');
const express = require('express');
const http = require('http');


@@ 15,10 13,10 @@ const fs = require('fs');
const WebSocket = require('ws');
const { JSDOM } = require('jsdom');

const env = process.env.NODE_ENV || 'development';
const environment = process.env.NODE_ENV || 'development';

dotenv.config({
  path: env === 'production' ? '.env.production' : '.env',
  path: environment === 'production' ? '.env.production' : '.env',
});

log.level = process.env.LOG_LEVEL || 'verbose';


@@ 52,8 50,6 @@ const redisUrlToClient = async (defaultConfig, redisUrl) => {
  return client;
};

const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));

/**
 * @param {string} json
 * @param {any} req


@@ 72,45 68,38 @@ const parseJSON = (json, req) => {
  }
};

const startMaster = () => {
  if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
    log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
  }

  log.warn(`Starting streaming API server master with ${numWorkers} workers`);
};

/**
 * @return {Object.<string, any>}
 * @param {Object.<string, any>} env the `process.env` value to read configuration from
 * @return {Object.<string, any>} the configuration for the PostgreSQL connection
 */
const pgConfigFromEnv = () => {
const pgConfigFromEnv = (env) => {
  const pgConfigs = {
    development: {
      user:     process.env.DB_USER || pg.defaults.user,
      password: process.env.DB_PASS || pg.defaults.password,
      database: process.env.DB_NAME || 'mastodon_development',
      host:     process.env.DB_HOST || pg.defaults.host,
      port:     process.env.DB_PORT || pg.defaults.port,
      user:     env.DB_USER || pg.defaults.user,
      password: env.DB_PASS || pg.defaults.password,
      database: env.DB_NAME || 'mastodon_development',
      host:     env.DB_HOST || pg.defaults.host,
      port:     env.DB_PORT || pg.defaults.port,
    },

    production: {
      user:     process.env.DB_USER || 'mastodon',
      password: process.env.DB_PASS || '',
      database: process.env.DB_NAME || 'mastodon_production',
      host:     process.env.DB_HOST || 'localhost',
      port:     process.env.DB_PORT || 5432,
      user:     env.DB_USER || 'mastodon',
      password: env.DB_PASS || '',
      database: env.DB_NAME || 'mastodon_production',
      host:     env.DB_HOST || 'localhost',
      port:     env.DB_PORT || 5432,
    },
  };

  let baseConfig;

  if (process.env.DATABASE_URL) {
    baseConfig = dbUrlToConfig(process.env.DATABASE_URL);
  if (env.DATABASE_URL) {
    baseConfig = dbUrlToConfig(env.DATABASE_URL);
  } else {
    baseConfig = pgConfigs[env];
    baseConfig = pgConfigs[environment];

    if (process.env.DB_SSLMODE) {
      switch(process.env.DB_SSLMODE) {
    if (env.DB_SSLMODE) {
      switch(env.DB_SSLMODE) {
      case 'disable':
      case '':
        baseConfig.ssl = false;


@@ 127,30 116,26 @@ const pgConfigFromEnv = () => {

  return {
    ...baseConfig,
    max: process.env.DB_POOL || 10,
    max: env.DB_POOL || 10,
    connectionTimeoutMillis: 15000,
    application_name: '',
  };
};

const startWorker = async (workerId) => {
  log.warn(`Starting worker ${workerId}`);

  const app = express();

  app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal');

  const pgPool = new pg.Pool(pgConfigFromEnv());
  const server = http.createServer(app);
  const redisNamespace = process.env.REDIS_NAMESPACE || null;
/**
 * @param {Object.<string, any>} env the `process.env` value to read configuration from
 * @return {Object.<string, any>} configuration for the Redis connection
 */
const redisConfigFromEnv = (env) => {
  const redisNamespace = env.REDIS_NAMESPACE || null;

  const redisParams = {
    socket: {
      host: process.env.REDIS_HOST || '127.0.0.1',
      port: process.env.REDIS_PORT || 6379,
      host: env.REDIS_HOST || '127.0.0.1',
      port: env.REDIS_PORT || 6379,
    },
    database: process.env.REDIS_DB || 0,
    password: process.env.REDIS_PASSWORD || undefined,
    database: env.REDIS_DB || 0,
    password: env.REDIS_PASSWORD || undefined,
  };

  if (redisNamespace) {


@@ 159,13 144,30 @@ const startWorker = async (workerId) => {

  const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';

  return {
    redisParams,
    redisPrefix,
    redisUrl: env.REDIS_URL,
  };
};

const startServer = async () => {
  const app = express();

  app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal');

  const pgPool = new pg.Pool(pgConfigFromEnv(process.env));
  const server = http.createServer(app);

  const { redisParams, redisUrl, redisPrefix } = redisConfigFromEnv(process.env);

  /**
   * @type {Object.<string, Array.<function(string): void>>}
   */
  const subs = {};

  const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
  const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
  const redisSubscribeClient = await redisUrlToClient(redisParams, redisUrl);
  const redisClient = await redisUrlToClient(redisParams, redisUrl);

  /**
   * @param {string[]} channels


@@ 1231,11 1233,10 @@ const startWorker = async (workerId) => {
  }, 30000);

  attachServerWithConfig(server, address => {
    log.warn(`Worker ${workerId} now listening on ${address}`);
    log.warn(`Streaming API now listening on ${address}`);
  });

  const onExit = () => {
    log.warn(`Worker ${workerId} exiting`);
    server.close();
    process.exit(0);
  };


@@ 1273,34 1274,4 @@ const attachServerWithConfig = (server, onSuccess) => {
  }
};

/**
 * @param {function(Error=): void} onSuccess
 */
const onPortAvailable = onSuccess => {
  const testServer = http.createServer();

  testServer.once('error', err => {
    onSuccess(err);
  });

  testServer.once('listening', () => {
    testServer.once('close', () => onSuccess());
    testServer.close();
  });

  attachServerWithConfig(testServer);
};

onPortAvailable(err => {
  if (err) {
    log.error('Could not start server, the port or socket is in use');
    return;
  }

  throng({
    workers: numWorkers,
    lifetime: Infinity,
    start: startWorker,
    master: startMaster,
  });
});
startServer();

M yarn.lock => yarn.lock +0 -17
@@ 2404,11 2404,6 @@
  dependencies:
    "@types/jest" "*"

"@types/throng@^4.0.2":
  version "4.0.2"
  resolved "https://registry.yarnpkg.com/@types/throng/-/throng-4.0.2.tgz#e352f5f86433e9dfbf3258414191852d25b2274f"
  integrity sha512-7tgh3R6vwtjj01URmhWXFSkWnm4wDJjsqLm8WPwIWadYjfsKAFi0HqabMQCU2JJ4TbeSGkb51qv27bBPN5Bubw==

"@types/tough-cookie@*":
  version "4.0.2"
  resolved "https://registry.yarnpkg.com/@types/tough-cookie/-/tough-cookie-4.0.2.tgz#6286b4c7228d58ab7866d19716f3696e03a09397"


@@ 7656,11 7651,6 @@ lodash.debounce@^4.0.8:
  resolved "https://registry.yarnpkg.com/lodash.debounce/-/lodash.debounce-4.0.8.tgz#82d79bff30a67c4005ffd5e2515300ad9ca4d7af"
  integrity sha1-gteb/zCmfEAF/9XiUVMArZyk168=

lodash.defaults@^4.0.1:
  version "4.2.0"
  resolved "https://registry.yarnpkg.com/lodash.defaults/-/lodash.defaults-4.2.0.tgz#d09178716ffea4dde9e5fb7b37f6f0802274580c"
  integrity sha1-0JF4cW/+pN3p5ft7N/bwgCJ0WAw=

lodash.get@^4.0:
  version "4.4.2"
  resolved "https://registry.yarnpkg.com/lodash.get/-/lodash.get-4.4.2.tgz#2d177f652fa31e939b4438d5341499dfa3825e99"


@@ 11141,13 11131,6 @@ text-table@^0.2.0:
  resolved "https://registry.yarnpkg.com/text-table/-/text-table-0.2.0.tgz#7f5ee823ae805207c00af2df4a84ec3fcfa570b4"
  integrity sha1-f17oI66AUgfACvLfSoTsP8+lcLQ=

throng@^4.0.0:
  version "4.0.0"
  resolved "https://registry.yarnpkg.com/throng/-/throng-4.0.0.tgz#983c6ba1993b58eae859998aa687ffe88df84c17"
  integrity sha1-mDxroZk7WOroWZmKpof/6I34TBc=
  dependencies:
    lodash.defaults "^4.0.1"

through@^2.3.8:
  version "2.3.8"
  resolved "https://registry.yarnpkg.com/through/-/through-2.3.8.tgz#0dd4c9ffaabc357960b1b724115d7e0e86a2e1f5"