diff --git a/README.md b/README.md index ff4f2575..303d24b2 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Let's get you developing the plugin server in no time: 1. Install dependencies and prepare for takeoff by running command `yarn`. -1. Start a development instance of [PostHog](/PostHog/posthog). After all, this is the _PostHog_ Plugin Server, and it works in conjuction with the main server. To avoid interference, disable the plugin server there with `PLUGIN_SERVER_IDLE=true`. +1. Start a development instance of [PostHog](/PostHog/posthog). After all, this is the _PostHog_ Plugin Server, and it works in conjuction with the main server. To avoid interference, disable the plugin server there with setting the PLUGIN_SERVER_IDLE env variable before running. `PLUGIN_SERVER_IDLE=true ./bin/start` 1. Make sure that the plugin server is configured correctly (see [Configuration](#Configuration)). Two settings that you MUST get right are DATABASE_URL and REDIS_URL - they need to be identical between the plugin server and the main server. @@ -74,6 +74,68 @@ It's magic! Just bump up `version` in `package.json` on the main branch and the You can also use a `bump patch/minor/major` label on a PR - this will do the above for you when the PR is merged. Courtesy of GitHub Actions. +## Walkthrough + +The story begins with `pluginServer.ts -> startPluginServer`, which is the main thread of the plugin server. + +This main thread spawns 4 worker threads, managed using Piscina. Each worker thread runs 10 tasks.[1](#f1) + +### The main thread + +Let's talk about the main thread first. This has: + +1. `pubSub`: a Redis powered pubSub mechanism for reloading plugins whenever a message is published by the main PostHog app. + +2. `server`: sets up connections to required DBs and queues(clickhouse, Kafka, Postgres, Redis), via `server.ts -> createServer`. This is a shared setup between the main and worker threads + +3. `fastifyInstance`: sets up a web server. Unused for now, but may be used for enabling webhooks in the future. + +4. `piscina`: this is the thread manager. `makePiscina` creates the manager, while `createWorker` creates the worker threads. + +5. `scheduleControl`: The scheduled job controller. Responsible for adding piscina tasks for scheduled jobs, when the time comes. + The schedule information makes it into `server.pluginSchedule` via `vm.ts -> createPluginConfigVM -> __tasks`, which parses for `runEvery*` tasks, and + then used in `src/workers/plugins/setup.ts -> loadSchedule`. More about the vm internals in a bit. + +6. `jobQueueConsumer`: The internal job queue consumer. This enables retries, scheduling jobs in the future (once) (Note: this is the difference between `scheduleControl` and this internal `jobQueue`). While `scheduleControl` is triggered via `runEveryMinute`, `runEveryHour` tasks, the `jobQueueConsumer` deals with `meta.jobs.doX(event).runAt(new Date())`. + + Enqueuing jobs is managed by `job-queue-manager.ts`, which is backed by a Graphile-worker (`graphile-queue.ts`) + +7. `queue`: Wait, another queue? + +Side Note about Queues: + +Yes, there are a LOT of queues. Each of them serve a separate function. The one we've seen already is the graphile job queue. This is the internal one dealing with `job.runAt()` tasks. + +Then, there's the main ingestion queue, which sends events from PostHog to the plugin server. This is a Celery (backed by Redis) or Kafka queue, depending on the setup (Enterprise/high event volume is Kafka). These are consumed by the `queue` above, and sent off to the Piscina workers (`src/main/ingestion-queues/queue.ts -> ingestEvent`). Since all of the "real" stuff happens inside the worker threads, you'll find the specific ingestion code there (`src/worker/ingestion/ingest-event.ts`). This finally writes things into Postgres. + +It's also a good idea to see the producer side of this ingestion queue, which comes from `Posthog/posthog/api/capture.py`. There's several tasks in this queue, and our plugin server is only interested in one kind of task: `posthog.tasks.process_event.process_event_with_plugins`. + +### Worker threads + +That's all for the main thread. Onto the workers now: It all begins with `worker.ts` and `createWorker()` + +`server` is the same DB connections setup as in the main thread. + +What's new here is `setupPlugins` and `createTaskRunner`. + +1. `setupPlugins`: Does `loadPluginsFromDB` and then `loadPlugins` (which creates VMs lazily for each plugin+team). TeamID represents a company using plugins, and each team can have it's own set of plugins enabled. The PluginConfig shows which team the config belongs to, the plugin to run, and the VM to run it in. + +2. `createTaskRunner`: There's some excellent wizardry happening here. `makePiscina` of `piscina.js` sets up the workers to run the existing file itself (using `__filename` in the setup config, returning `createWorker()`. This `createWorker()` is a function returning `createTaskRunner`, which is a [curried function](https://javascript.info/currying-partials), which given `{task, args}`, returns `workerTasks[task](server, args)`. These worker tasks are available in `src/worker/tasks.ts`. + +### Worker Lifecycle + +TODO: what happens with getPLuginRows, getPluginConfigRows and SetupPlugins. + +Q: Where is teamID populated? At event creation time? (in posthog/posthog? row.pk) + +### VM Internals + +TODO + +### End Notes + +1: What are tasks? - TASKS_PER_WORKER - a Piscina setting (https://github.com/piscinajs/piscina#constructor-new-piscinaoptions) -> concurrentTasksPerWorker + ## Questions? ### [Join our Slack community.](posthog.com/slack) diff --git a/src/worker/plugins/run.ts b/src/worker/plugins/run.ts index 46e42192..ede3a0f0 100644 --- a/src/worker/plugins/run.ts +++ b/src/worker/plugins/run.ts @@ -44,7 +44,8 @@ export async function runOnSnapshot(server: PluginsServer, event: PluginEvent): } export async function runProcessEvent(server: PluginsServer, event: PluginEvent): Promise { - const pluginsToRun = getPluginsForTeam(server, event.team_id) + const teamId = event.team_id + const pluginsToRun = getPluginsForTeam(server, teamId) let returnedEvent: PluginEvent | null = event const pluginsSucceeded = [] @@ -57,6 +58,10 @@ export async function runProcessEvent(server: PluginsServer, event: PluginEvent) try { returnedEvent = (await processEvent(returnedEvent)) || null + if (returnedEvent.team_id != teamId) { + returnedEvent = null // don't try to ingest events with modified teamIDs + throw new Error('Illegal Operation: Plugin tried to change teamID') + } pluginsSucceeded.push(`${pluginConfig.plugin?.name} (${pluginConfig.id})`) } catch (error) { await processError(server, pluginConfig, error, returnedEvent) @@ -65,7 +70,7 @@ export async function runProcessEvent(server: PluginsServer, event: PluginEvent) } server.statsd?.timing(`plugin.process_event`, timer, { plugin: pluginConfig.plugin?.name ?? '?', - teamId: event.team_id.toString(), + teamId: teamId.toString(), }) if (!returnedEvent) { diff --git a/tests/plugins.test.ts b/tests/plugins.test.ts index 60b486c5..4bd28694 100644 --- a/tests/plugins.test.ts +++ b/tests/plugins.test.ts @@ -196,6 +196,54 @@ test('local plugin with broken index.js does not do much', async () => { unlink() }) +test('plugin changing teamID throws error', async () => { + getPluginRows.mockReturnValueOnce([ + mockPluginWithArchive(` + function processEvent (event, meta) { + event.team_id = 400 + return event } + `), + ]) + + getPluginConfigRows.mockReturnValueOnce([pluginConfig39]) + getPluginAttachmentRows.mockReturnValueOnce([]) + + await setupPlugins(mockServer) + const { pluginConfigs } = mockServer + + const event = { event: '$test', properties: {}, team_id: 2 } as PluginEvent + const returnedEvent = await runProcessEvent(mockServer, event) + + expect(returnedEvent).toEqual(null) + + expect(processError).toHaveBeenCalledWith( + mockServer, + pluginConfigs.get(39)!, + Error('Illegal Operation: Plugin tried to change teamID'), + null + ) +}) + +test('plugin changing teamID prevents ingestion', async () => { + getPluginRows.mockReturnValueOnce([ + mockPluginWithArchive(` + function processEvent (event, meta) { + event.team_id = 400 + return event } + `), + ]) + + getPluginConfigRows.mockReturnValueOnce([pluginConfig39]) + getPluginAttachmentRows.mockReturnValueOnce([]) + + await setupPlugins(mockServer) + + const event = { event: '$test', properties: {}, team_id: 2 } as PluginEvent + const returnedEvent = await runProcessEvent(mockServer, event) + + expect(returnedEvent).toEqual(null) +}) + test('plugin throwing error does not prevent ingestion and failure is noted in event', async () => { // silence some spam console.log = jest.fn()