From bd2699cdb4b67a0f15c00c4e3b516844fa950594 Mon Sep 17 00:00:00 2001 From: Neil Kakkar Date: Wed, 12 May 2021 15:35:26 +0100 Subject: [PATCH 1/3] add code walkthrough and prevent changing team IDs in plugins --- README.md | 68 ++++++++++++++++++++++++++++++++++++++- src/worker/plugins/run.ts | 9 ++++-- tests/plugins.test.ts | 48 +++++++++++++++++++++++++++ 3 files changed, 122 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index cc8ab3112..452701126 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Let's get you developing the plugin server in no time: 1. Install dependencies and prepare for takeoff by running command `yarn`. -1. Start a development instance of [PostHog](/PostHog/posthog). After all, this is the _PostHog_ Plugin Server, and it works in conjuction with the main server. To avoid interference, disable the plugin server there with `PLUGIN_SERVER_IDLE=true`. +1. Start a development instance of [PostHog](/PostHog/posthog). After all, this is the _PostHog_ Plugin Server, and it works in conjuction with the main server. To avoid interference, disable the plugin server there with setting the PLUGIN_SERVER_IDLE env variable before running. `PLUGIN_SERVER_IDLE=true ./bin/start` 1. Make sure that the plugin server is configured correctly (see [Configuration](#Configuration)). Two settings that you MUST get right are DATABASE_URL and REDIS_URL - they need to be identical between the plugin server and the main server. @@ -74,6 +74,72 @@ It's magic! Just bump up `version` in `package.json` on the main branch and the You can also use a `bump patch/minor/major` label on a PR - this will do the above for you when the PR is merged. Courtesy of GitHub Actions. +## Walkthrough + +The story begins with `pluginServer.ts -> startPluginServer`, which is the main thread of the plugin server. + +This main thread spawns 4 worker threads, managed using Piscina. Each worker thread runs 10 tasks (Q: what are tasks? - TASKS_PER_WORKER - a Piscina setting) + +### The main thread + +Let's talk about the main thread first. This has: + +1. `pubSub`: a Redis powered pubSub mechanism for reloading plugins whenever a message is published by the main PostHog app. + +2. `server`: sets up connections to required DBs and queues(clickhouse, Kafka, Postgres, Redis), via `server.ts -> createServer`. This is a shared setup between the main and worker threads + +3. `fastifyInstance`: sets up a web server. Unused for now, but may be used for enabling webhooks in the future. + +4. `piscina`: this is the thread manager. `makePiscina` creates the manager, while `createWorker` creates the worker threads. + +5. `scheduleControl`: The scheduled job controller. Responsible for adding piscina tasks for scheduled jobs, when the time comes. + Q: the schedule information somehow makes it into `server.pluginSchedule`. Unsure how, yet. + Comes in `vm.ts -> createPluginConfigVM -> __tasks`. + which is used here: `src/workers/plugins/setup.ts -> loadSchedule`. More about the vm internals in a bit. + +6. `jobQueueConsumer`: The internal job queue consumer. This enables retries, scheduling jobs in the future (once) (Note: this is the difference between `scheduleControl` and this internal `jobQueue`). While `scheduleControl` is triggered via `runEveryMinute`, `runEveryHour` tasks, the `jobQueueConsumer` deals with `meta.jobs.doX(event).runAt(new Date())`. + + Enqueuing jobs is managed by `job-queue-manager.ts`, which is backed by a Graphile-worker (`graphile-queue.ts`) + +7. `queue`: Wait, another queue? + +Side Note about Queues: + +Yes, there are a LOT of queues. Each of them serve a separate function. The one we've seen already is the graphile job queue. This is the internal one dealing with `job.runAt()` tasks. + +Then, there's the main ingestion queue, which sends events from PostHog to the plugin server. This is a Celery (backed by Redis) or Kafka queue, depending on the setup (Enterprise/high event volume is Kafka). These are consumed by the `queue` above, and sent off to the Piscina workers (`src/main/ingestion-queues/queue.ts -> ingestEvent`). Since all of the "real" stuff happens inside the worker threads, you'll find the specific ingestion code there (`src/worker/ingestion/ingest-event.ts`). This finally writes things into Postgres. + +It's also a good idea to see the producer side of this ingestion queue, which comes from `Posthog/posthog/api/capture.py`. There's several tasks in this queue, and our plugin server is only interested in one kind of task: `posthog.tasks.process_event.process_event_with_plugins`. + +### Worker threads + +That's all for the main thread. Onto the workers now: It all begins with `worker.ts` and `createWorker()` + +`server` is the same DB connections setup as in the main thread. + +What's new here is `setupPlugins` and `createTaskRunner`. + +1. `setupPlugins`: Does `loadPluginsFromDB` and then `loadPlugins` (which creates VMs lazily for each plugin) + +Q: Why does the pluginConfig have a vm? How exactly does this orchestration work? Are VMs created on the fly? What's the meaning of a "plugin installation step"? +Are these VMs persistent over lifetime of a thread? Do different threads have different VM setup (I hope not? doesn't make sense: I'd imagine every worker thread to have all plugins installed, so each VM on every thread is exactly the same, so every event can be processed on every worker). + +... Looks like every plugin gets its own VM, which persists till the worker is running. + +.. Hmm, also, there's multiple teams, each with their own configuration + set of enabled plugins. In that sense, it's probably a good idea to begin with a clean slate for each plugin+team id. + +2. `createTaskRunner`: There's some excellent wizardry happening here. `makePiscina`, runTask takes the arguments, which sets up the file to be run as `piscina.js` itself, using the `__filename` setup, returning `createWorker`, which is a function returning `createTaskRunner`, which is a [curried function](link), which given `{task, args}`, returns `workerTasks[task](server, args)`. These worker tasks are available in `src/worker/tasks.ts`. + +### Worker Lifecycle + +TODO: what happens with getPLuginRows, getPluginConfigRows and SetupPlugins. + +Q: Where is teamID populated? At event creation time? (in posthog/posthog? row.pk) + +### VM Internals + +TODO + ## Questions? ### [Join our Slack community.](posthog.com/slack) diff --git a/src/worker/plugins/run.ts b/src/worker/plugins/run.ts index 46e421924..7afe4148c 100644 --- a/src/worker/plugins/run.ts +++ b/src/worker/plugins/run.ts @@ -44,7 +44,8 @@ export async function runOnSnapshot(server: PluginsServer, event: PluginEvent): } export async function runProcessEvent(server: PluginsServer, event: PluginEvent): Promise { - const pluginsToRun = getPluginsForTeam(server, event.team_id) + const team_id = event.team_id + const pluginsToRun = getPluginsForTeam(server, team_id) let returnedEvent: PluginEvent | null = event const pluginsSucceeded = [] @@ -57,6 +58,10 @@ export async function runProcessEvent(server: PluginsServer, event: PluginEvent) try { returnedEvent = (await processEvent(returnedEvent)) || null + if (returnedEvent.team_id != team_id) { + returnedEvent = null // don't try to ingest events with modified teamIDs + throw new Error('Changing team IDs is an illegal operation') + } pluginsSucceeded.push(`${pluginConfig.plugin?.name} (${pluginConfig.id})`) } catch (error) { await processError(server, pluginConfig, error, returnedEvent) @@ -65,7 +70,7 @@ export async function runProcessEvent(server: PluginsServer, event: PluginEvent) } server.statsd?.timing(`plugin.process_event`, timer, { plugin: pluginConfig.plugin?.name ?? '?', - teamId: event.team_id.toString(), + teamId: team_id.toString(), }) if (!returnedEvent) { diff --git a/tests/plugins.test.ts b/tests/plugins.test.ts index 1ba7c83d9..4369d61d3 100644 --- a/tests/plugins.test.ts +++ b/tests/plugins.test.ts @@ -194,6 +194,54 @@ test('local plugin with broken index.js does not do much', async () => { unlink() }) +test('plugin changing teamID throws error', async () => { + getPluginRows.mockReturnValueOnce([ + mockPluginWithArchive(` + function processEvent (event, meta) { + event.team_id = 400 + return event } + `), + ]) + + getPluginConfigRows.mockReturnValueOnce([pluginConfig39]) + getPluginAttachmentRows.mockReturnValueOnce([]) + + await setupPlugins(mockServer) + const { pluginConfigs } = mockServer + + const event = { event: '$test', properties: {}, team_id: 2 } as PluginEvent + const returnedEvent = await runProcessEvent(mockServer, event) + + expect(returnedEvent).toEqual(null) + + expect(processError).toHaveBeenCalledWith( + mockServer, + pluginConfigs.get(39)!, + Error('Changing team IDs is an illegal operation'), + null + ) +}) + +test('plugin changing teamID prevents ingestion', async () => { + getPluginRows.mockReturnValueOnce([ + mockPluginWithArchive(` + function processEvent (event, meta) { + event.team_id = 400 + return event } + `), + ]) + + getPluginConfigRows.mockReturnValueOnce([pluginConfig39]) + getPluginAttachmentRows.mockReturnValueOnce([]) + + await setupPlugins(mockServer) + + const event = { event: '$test', properties: {}, team_id: 2 } as PluginEvent + const returnedEvent = await runProcessEvent(mockServer, event) + + expect(returnedEvent).toEqual(null) +}) + test('plugin throwing error does not prevent ingestion and failure is noted in event', async () => { // silence some spam console.log = jest.fn() From 1519b31ab660244017d62005d326e95aea71c476 Mon Sep 17 00:00:00 2001 From: Neil Kakkar Date: Wed, 12 May 2021 16:29:03 +0100 Subject: [PATCH 2/3] Update README.md --- README.md | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 5850caec4..303d24b2c 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,7 @@ Courtesy of GitHub Actions. The story begins with `pluginServer.ts -> startPluginServer`, which is the main thread of the plugin server. -This main thread spawns 4 worker threads, managed using Piscina. Each worker thread runs 10 tasks (Q: what are tasks? - TASKS_PER_WORKER - a Piscina setting) +This main thread spawns 4 worker threads, managed using Piscina. Each worker thread runs 10 tasks.[1](#f1) ### The main thread @@ -93,9 +93,8 @@ Let's talk about the main thread first. This has: 4. `piscina`: this is the thread manager. `makePiscina` creates the manager, while `createWorker` creates the worker threads. 5. `scheduleControl`: The scheduled job controller. Responsible for adding piscina tasks for scheduled jobs, when the time comes. - Q: the schedule information somehow makes it into `server.pluginSchedule`. Unsure how, yet. - Comes in `vm.ts -> createPluginConfigVM -> __tasks`. - which is used here: `src/workers/plugins/setup.ts -> loadSchedule`. More about the vm internals in a bit. + The schedule information makes it into `server.pluginSchedule` via `vm.ts -> createPluginConfigVM -> __tasks`, which parses for `runEvery*` tasks, and + then used in `src/workers/plugins/setup.ts -> loadSchedule`. More about the vm internals in a bit. 6. `jobQueueConsumer`: The internal job queue consumer. This enables retries, scheduling jobs in the future (once) (Note: this is the difference between `scheduleControl` and this internal `jobQueue`). While `scheduleControl` is triggered via `runEveryMinute`, `runEveryHour` tasks, the `jobQueueConsumer` deals with `meta.jobs.doX(event).runAt(new Date())`. @@ -119,16 +118,9 @@ That's all for the main thread. Onto the workers now: It all begins with `worker What's new here is `setupPlugins` and `createTaskRunner`. -1. `setupPlugins`: Does `loadPluginsFromDB` and then `loadPlugins` (which creates VMs lazily for each plugin) +1. `setupPlugins`: Does `loadPluginsFromDB` and then `loadPlugins` (which creates VMs lazily for each plugin+team). TeamID represents a company using plugins, and each team can have it's own set of plugins enabled. The PluginConfig shows which team the config belongs to, the plugin to run, and the VM to run it in. -Q: Why does the pluginConfig have a vm? How exactly does this orchestration work? Are VMs created on the fly? What's the meaning of a "plugin installation step"? -Are these VMs persistent over lifetime of a thread? Do different threads have different VM setup (I hope not? doesn't make sense: I'd imagine every worker thread to have all plugins installed, so each VM on every thread is exactly the same, so every event can be processed on every worker). - -... Looks like every plugin gets its own VM, which persists till the worker is running. - -.. Hmm, also, there's multiple teams, each with their own configuration + set of enabled plugins. In that sense, it's probably a good idea to begin with a clean slate for each plugin+team id. - -2. `createTaskRunner`: There's some excellent wizardry happening here. `makePiscina`, runTask takes the arguments, which sets up the file to be run as `piscina.js` itself, using the `__filename` setup, returning `createWorker`, which is a function returning `createTaskRunner`, which is a [curried function](link), which given `{task, args}`, returns `workerTasks[task](server, args)`. These worker tasks are available in `src/worker/tasks.ts`. +2. `createTaskRunner`: There's some excellent wizardry happening here. `makePiscina` of `piscina.js` sets up the workers to run the existing file itself (using `__filename` in the setup config, returning `createWorker()`. This `createWorker()` is a function returning `createTaskRunner`, which is a [curried function](https://javascript.info/currying-partials), which given `{task, args}`, returns `workerTasks[task](server, args)`. These worker tasks are available in `src/worker/tasks.ts`. ### Worker Lifecycle @@ -140,6 +132,10 @@ Q: Where is teamID populated? At event creation time? (in posthog/posthog? row.p TODO +### End Notes + +1: What are tasks? - TASKS_PER_WORKER - a Piscina setting (https://github.com/piscinajs/piscina#constructor-new-piscinaoptions) -> concurrentTasksPerWorker + ## Questions? ### [Join our Slack community.](posthog.com/slack) From eb7d9f80c04d94d979dadc302ca48be1037faf91 Mon Sep 17 00:00:00 2001 From: Neil Kakkar Date: Thu, 13 May 2021 10:22:19 +0100 Subject: [PATCH 3/3] address comments --- src/worker/plugins/run.ts | 10 +++++----- tests/plugins.test.ts | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/worker/plugins/run.ts b/src/worker/plugins/run.ts index 7afe4148c..ede3a0f0d 100644 --- a/src/worker/plugins/run.ts +++ b/src/worker/plugins/run.ts @@ -44,8 +44,8 @@ export async function runOnSnapshot(server: PluginsServer, event: PluginEvent): } export async function runProcessEvent(server: PluginsServer, event: PluginEvent): Promise { - const team_id = event.team_id - const pluginsToRun = getPluginsForTeam(server, team_id) + const teamId = event.team_id + const pluginsToRun = getPluginsForTeam(server, teamId) let returnedEvent: PluginEvent | null = event const pluginsSucceeded = [] @@ -58,9 +58,9 @@ export async function runProcessEvent(server: PluginsServer, event: PluginEvent) try { returnedEvent = (await processEvent(returnedEvent)) || null - if (returnedEvent.team_id != team_id) { + if (returnedEvent.team_id != teamId) { returnedEvent = null // don't try to ingest events with modified teamIDs - throw new Error('Changing team IDs is an illegal operation') + throw new Error('Illegal Operation: Plugin tried to change teamID') } pluginsSucceeded.push(`${pluginConfig.plugin?.name} (${pluginConfig.id})`) } catch (error) { @@ -70,7 +70,7 @@ export async function runProcessEvent(server: PluginsServer, event: PluginEvent) } server.statsd?.timing(`plugin.process_event`, timer, { plugin: pluginConfig.plugin?.name ?? '?', - teamId: team_id.toString(), + teamId: teamId.toString(), }) if (!returnedEvent) { diff --git a/tests/plugins.test.ts b/tests/plugins.test.ts index 66c78bef1..4bd28694d 100644 --- a/tests/plugins.test.ts +++ b/tests/plugins.test.ts @@ -219,7 +219,7 @@ test('plugin changing teamID throws error', async () => { expect(processError).toHaveBeenCalledWith( mockServer, pluginConfigs.get(39)!, - Error('Changing team IDs is an illegal operation'), + Error('Illegal Operation: Plugin tried to change teamID'), null ) })