Implement total driver system + detach River from pgx/v5#212
Conversation
772ee03 to
67b1c05
Compare
|
@bgentry Damn, I know this changeset is absolutely brutal, sorry, but if you want to start mulling it over. |
67b1c05 to
745dce1
Compare
bgentry
left a comment
There was a problem hiding this comment.
Phew 😅 I still have quite a few of the larger bits to review in here, but I've cleared off a lot of the higher level parts and all of the tiny stuff. I'll need one more pass to get through this I think.
For now, here are some initial things I noticed / wanted to discuss.
| Rollback(ctx context.Context) error | ||
| } | ||
|
|
||
| // Listner listens for notifications. In Postgres, this is a database connection |
There was a problem hiding this comment.
| // Listner listens for notifications. In Postgres, this is a database connection | |
| // Listener listens for notifications. In Postgres, this is a database connection |
| // rows. For example, attempting to cancel a job that doesn't exist will | ||
| // return this error. | ||
| ErrNotFound = errors.New("not found") | ||
| ErrNotFound = rivertype.ErrNotFound |
There was a problem hiding this comment.
Got thrown off at first trying to figure out how you were still returning the top level ErrNotFound but now I see it. I think this makes sense—if it's the same actual error value across packages then it will always be considered equal and errors.Is() checks will succeed, right?
Only downside is the extra layer of indirection figuring out what the aliased value actually is, but I don't think that matters too much in the case of an error like this.
There was a problem hiding this comment.
Yeah, errors.Is still works because it's a type alias (with =).
I would've slightly preferred to not have it at the top-level river and only be in rivertype, but added this alias for backwards compatibility (although this is a change that maybe we could get away with). It needs to be in rivertype so lower level packages can reference it.
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return c.jobCancel(ctx, c.driver.UnwrapExecutor(tx), jobID) |
There was a problem hiding this comment.
I love that these are refactored to share all their logic, thank you!
There was a problem hiding this comment.
Do you think it makes sense for this to be tucked away into a driver-specific package? Is the idea that the serialization & encoding is driver-specific?
The reason I'm asking is because some recent UI work has made me think about whether we might want to stuff more info in here. For example at gives us just the time the error occurred, but we don't have any info about how long it took for the job to run on that attempt. We have to be mindful of adding too much in here but there might still be some additional bits worth the overhead.
There was a problem hiding this comment.
Nope. The fact that it's in the driver is purely a holdover from the sqlc.yaml type override that was already in there. I'm totally good with errors being pushed up a level — it'd remove duplicate code at the driver layer that's not very valuable right now.
| for { | ||
| // Wrapped in a function so that defers run as expected. | ||
| numScheduled, err := func() (int64, error) { | ||
| numScheduled, err := func() (int, error) { |
There was a problem hiding this comment.
oh come on, you're not gonna schedule 2.1 billion jobs in one query?? 😢
There was a problem hiding this comment.
Haha, I figure anyone big and powerful enough to be scheduling 2.1B jobs will also be rich enough to afford 64-bit arch!
| // Kinds returns an updated filter set that will only return jobs of the given | ||
| // kinds. | ||
| func (p *JobListParams) Kinds(kinds ...string) *JobListParams { | ||
| result := p.copy() | ||
| result.kinds = make([]string, 0, len(kinds)) | ||
| copy(result.kinds, kinds) | ||
| return result | ||
| } |
There was a problem hiding this comment.
A bit hard to tell, but I think this addition might be missing test coverage?
There was a problem hiding this comment.
Yeah, added. And actually, it didn't work :)
TBF, I'd copied the implementation from Queues, which also didn't have a test, and also didn't work. Corrected that one too, and put a note in the changelog.
| go 1.21.4 | ||
|
|
||
| require github.com/jackc/pgx/v5 v5.5.0 | ||
| replace github.com/riverqueue/river/rivertype => ../rivertype |
There was a problem hiding this comment.
Is this meant to stay in here?
There was a problem hiding this comment.
Yeah, I believe this is right. All the pgx stuff disappears, but we still need a reference to rivertype for structs like JobRow.
| # This one is necessary because `args` is nullable (this seems to have | ||
| # been an oversight, but one we're determined isn't worth correcting | ||
| # for now), and the `database/sql` variant of sqlc will give it a | ||
| # crazy type by default, so here we give it something more reasonable. |
There was a problem hiding this comment.
Can this not go in a shared util package somewhere?
There was a problem hiding this comment.
Ugh, it could be, but we'd have to Go module-ify river/internal/util :/
But actually, since it was just this one function that made it into riverdriverutil, I just removed this package and duplicated the helper into each driver as an internal mapSlice. I think this is a little better since I don't love that this new utility package.
If more utilities need to be shared later on, we can revisit this later.
745dce1 to
19156b4
Compare
bgentry
left a comment
There was a problem hiding this comment.
Well, gotta be the easiest > 10k line PR I've ever reviewed 🤣 Really great work on this. It might help to push any subsequent changes into additional commits to make them even easier to re-review. Most of this commentary is pretty minor, should be good to go after this 👏
| -- name: JobGetByKindMany :many | ||
| SELECT * | ||
| FROM river_job | ||
| WHERE kind = any(@kind::text[]) | ||
| ORDER BY id; | ||
|
|
||
| -- name: JobGetByKindAndUniqueProperties :one |
There was a problem hiding this comment.
Missed the alphabetization on these two
| var driver TDriver | ||
| jobRow, err := driver.UnwrapExecutor(tx).JobSetStateIfRunning(ctx, riverdriver.JobSetStateCompleted(job.ID, time.Now())) |
There was a problem hiding this comment.
This is a small behavioral change due to the differences in the underlying queries, but I don't think it's a big issue. You really shouldn't be using this to set a job as complete if it's not actually running.
There was a problem hiding this comment.
Yeah true, but I think you're right in that it won't matter. If being called in a Work function then the job should be running unless something weird is going on. Feels good to get rid of a big SQL blob.
|
|
||
| func jobRowFromInternal(internal *dbsqlc.RiverJob) *rivertype.JobRow { | ||
| return &rivertype.JobRow{ | ||
| ID: internal.ID, |
There was a problem hiding this comment.
May as well fully sort this struct declaration now, same for migrationFromInternal below
There was a problem hiding this comment.
I think it is. I'm sort of using the convention of ID at the top, and then alphabetical from there on out.
| t.Run("JobGetByIDMany", func(t *testing.T) { | ||
| t.Parallel() | ||
| }) | ||
|
|
||
| t.Run("JobGetByKindAndUniqueProperties", func(t *testing.T) { | ||
| t.Parallel() | ||
| }) | ||
|
|
||
| t.Run("JobGetByKindMany", func(t *testing.T) { | ||
| t.Parallel() | ||
| }) | ||
|
|
||
| t.Run("JobGetStuck", func(t *testing.T) { | ||
| t.Parallel() | ||
| }) | ||
|
|
||
| t.Run("JobInsert", func(t *testing.T) { | ||
| t.Parallel() | ||
| }) |
There was a problem hiding this comment.
Are these still meant to be empty? same comment for the other empty ones in here
There was a problem hiding this comment.
Yep, caught me. These were for functions where there were no equivalents from adapter_test.go, and I'd refactored so many tests already that I'd left these for an exercise for the future.
That said, had some time this evening, so I filled them all in. The test bootstrap here should actually be pretty good for testing a theoretical future driver now, even without having to put it in a real client. I also found a couple things that could be considered bugs, like some timestamps not being returned back as UTC() when they probably should've been.
You might want to go through eventually and double-check that you're happy with all the conditions being checked. A few could probably use a couple more cases.
532a05c to
16fd58f
Compare
A small one to upgrade: * golangci-lint GitHub Action from `v4` to `v5` * golangci-lint from 1.55 to 1.56 A small change, but I upgraded locally at some point, and it's causing a build issue in #212. The `tparallel` lint previously had a bug where it'd want you to put a `t.Parallel()` on test cases that use `t.Setenv()`, evening though use of `t.Parallel()` with `t.Setenv()` is disallowed by Go's test framework, so you'd have to mark those test cases with `nolint`. That bug's been fixed now, and now the linter detects that the `nolint` is no longer necessary, and automatically strips it out with my upgraded golangci-lint version, causing a failure when I push to CI. The `v5` GitHub Action is also somewhat nice because it stops using a deprecated NodeJS version, so produces fewer warnings in the log.
A small one to upgrade: * golangci-lint GitHub Action from `v4` to `v5` * golangci-lint from 1.55 to 1.56 A small change, but I upgraded locally at some point, and it's causing a build issue in #212. The `tparallel` lint previously had a bug where it'd want you to put a `t.Parallel()` on test cases that use `t.Setenv()`, evening though use of `t.Parallel()` with `t.Setenv()` is disallowed by Go's test framework, so you'd have to mark those test cases with `nolint`. That bug's been fixed now, and now the linter detects that the `nolint` is no longer necessary, and automatically strips it out with my upgraded golangci-lint version, causing a failure when I push to CI. The `v5` GitHub Action is also somewhat nice because it stops using a deprecated NodeJS version, so produces fewer warnings in the log.
A small one to upgrade: * golangci-lint GitHub Action from `v4` to `v5` * golangci-lint from 1.55 to 1.56 A small change, but I upgraded locally at some point, and it's causing a build issue in #212. The `tparallel` lint previously had a bug where it'd want you to put a `t.Parallel()` on test cases that use `t.Setenv()`, evening though use of `t.Parallel()` with `t.Setenv()` is disallowed by Go's test framework, so you'd have to mark those test cases with `nolint`. That bug's been fixed now, and now the linter detects that the `nolint` is no longer necessary, and automatically strips it out with my upgraded golangci-lint version, causing a failure when I push to CI. The `v5` GitHub Action is also somewhat nice because it stops using a deprecated NodeJS version, so produces fewer warnings in the log.
16fd58f to
f3637b9
Compare
A small one to upgrade: * golangci-lint GitHub Action from `v4` to `v5` * golangci-lint from 1.55 to 1.56 A small change, but I upgraded locally at some point, and it's causing a build issue in #212. The `tparallel` lint previously had a bug where it'd want you to put a `t.Parallel()` on test cases that use `t.Setenv()`, even though use of `t.Parallel()` with `t.Setenv()` is disallowed by Go's test framework, so you'd have to mark those test cases with `nolint`. That bug's been fixed now, and now the linter detects that the `nolint` is no longer necessary, and automatically strips it out with my upgraded golangci-lint version, causing a failure when I push to CI. The `v5` GitHub Action is also somewhat nice because it stops using a deprecated NodeJS version, so produces fewer warnings in the log.
e059336 to
bcff31c
Compare
|
@bgentry Thanks for the great feedback here. After one more major push tonight, I think I was able to address all feedback, and have responded to all comments above. (And damn, just responding to the review of this PR felt like a big project — ty for writing all that up.) 95% of my responses are agreeing + fixing, but you should probably take another quick scan through anyway. The lint CI is failing because of the problem described in #223, but the build should be good aside from that. The suite's picked up another few dozen tests since the last time you looked even, so I'm feeling pretty good about it. Also added changelog notes for everything. |
5f8d648 to
5fe26b9
Compare
A small one to upgrade: * golangci-lint GitHub Action from `v4` to `v5` * golangci-lint from 1.55 to 1.56 A small change, but I upgraded locally at some point, and it's causing a build issue in #212. The `tparallel` lint previously had a bug where it'd want you to put a `t.Parallel()` on test cases that use `t.Setenv()`, even though use of `t.Parallel()` with `t.Setenv()` is disallowed by Go's test framework, so you'd have to mark those test cases with `nolint`. That bug's been fixed now, and now the linter detects that the `nolint` is no longer necessary, and automatically strips it out with my upgraded golangci-lint version, causing a failure when I push to CI. The `v5` GitHub Action is also somewhat nice because it stops using a deprecated NodeJS version, so produces fewer warnings in the log.
0dcbca0 to
5749b1c
Compare
5749b1c to
55183b5
Compare
| } | ||
| } | ||
|
|
||
| t.Run("FiltersByKind", func(t *testing.T) { //nolint:dupl |
There was a problem hiding this comment.
heh, this linter can get pretty annoying in tests.
There was a problem hiding this comment.
Yeah ... we might want to just disable it. So far I've never seen a situation where it's done anything useful.
| require.Equal(t, []int64{job3.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID })) | ||
| }) | ||
|
|
||
| t.Run("FiltersByQueue", func(t *testing.T) { //nolint:dupl |
There was a problem hiding this comment.
Thanks for adding this and fixing the issue 🙏
|
|
||
| exec, _ := setupExecutor(ctx, t, driver, beginTx) | ||
|
|
||
| job, err := exec.JobGetByID(ctx, 99999) |
There was a problem hiding this comment.
realized from some of our other tests that we can just use 0 here and have no possibility of a flaky test ever
| job, err := exec.JobGetByID(ctx, 99999) | |
| job, err := exec.JobGetByID(ctx, 0) |
There was a problem hiding this comment.
Sweet fixed. There were a total of four instances of 9999 lookups between the riverdrivertest stuff I added and client_test. Updated all of them.
Previously, we implement a basic driver interface in River such that no
exported River APIs depended on `pgx/v5` directly, with the idea being
that although only one database driver was actually supported, it'd give
us the leeway we needed to fully implement a multi-driver system later.
Under the covers, the drivers were a hack, and `GetDBPool` just returned
the wrapped pgx database pool back to the client and other packages that
wanted to access it.
This change takes the initial driver push and carries it all the way.
The driver API becomes fully fleshed out with all database operations
River needs to operate, and the top-level River module drops internal
use of `pgx/v5` completely (it's still used in tests, but no non-test
packages).
If `pgx/v6` were to be released tomorrow, this would allow us to add
support for it quite easily. It also allows us to theoretically fully
implement the `databasesql` driver, or to implement drivers for other
databases like MySQL and SQLite (it'd be work to get that done, but
the driver API is generic enough to make it possible).
Notably, the `dbadapter` package goes away because the River driver API
becomes a very similar concept to it, with interface functions for major
operations. A difference is that given we'd like to implement multiple
drivers, the driver code should stay as lean as possible so that we
don't have to reimplement it over and over again, so any higher level
logic that could be extracted from `StandardAdapter` was, including:
* About half of the list logic was already in `dblist`, but we move the
other half too so that all list logic is now packaged together.
* Uniqueness logic moves to a new `dbunique` package.
`dbadapter` tests move into a new module `riverdrivertest` that provides
some helpers that easily allow us to run the full barrage against
`riverpgxv5`, but also any new drivers that we might add in the future.
(`riverdatabasesql` is also tested, but uses a different helper that
only checks its support for migrations.)
Possibly the trickiest incision was moving listen/notify logic over to
the driver, which picks up a new `GetListener()` function that returns
this `Listener` interface:
// Listner listens for notifications. In Postgres, this is a database connection
// where `LISTEN` has been run.
//
// API is not stable. DO NOT IMPLEMENT.
type Listener interface {
Close(ctx context.Context) error
Connect(ctx context.Context) error
Listen(ctx context.Context, topic string) error
Ping(ctx context.Context) error
Unlisten(ctx context.Context, topic string) error
WaitForNotification(ctx context.Context) (*Notification, error)
}
This is backed by a connection pool and `LISTEN` for `riverpgxv5`, but
the idea is that for databases that don't support listen/notify, it's
generic enough that it could be reimplemented as something else under
the hood, like say an unlogged table of some sort.
`river/riverdriver` becomes its own Go module. Previously, we'd gotten
away without referencing it from driver implementations, but the
implementations now reference it for parameter types and some interfaces
and return values. Like most other modules, modules that need to
reference `riverdriver` use `replace` in their `go.mod`s, so I don't
expect the extra module to add any extra headache to our releases.
Discerning readers may notice that driver implementations reference some
types in `riverdriver` and other types in `rivertypes`. I'm defining
which is which as: `riverdriver` should contain anything unstable that
we're allowed to change and which a user shouldn't be accessing anyway
(e.g. say function parameter structs). `rivertypes` is for specific
constructs that external River users will need to reference like
`JobRow`.
Lastly, I'll say that this isn't final. I've refactored a lot of tests
to ensure that this patch doesn't have any major outstanding TODOs and
doesn't leave the code any worse that when it started, but there are a
lot of follow up improvements that I can potentially make, and expect
to.
55183b5 to
501c484
Compare
|
TY man! |
As of #212, `rivertype` became a separate Go module from the mainline `river`. Here, add it to the release instructions so that it gets properly tagged along with other all other modules. Also add it to the `update-submodule-versions` program. This isn't strictly necessary because `rivertype` doesn't have any submodule versions to update, but it feels like it should be there for completeness just in case, and also because it makes the list in the program a complete list of all Go modules in the project.
As of #212, `rivertype` became a separate Go module from the mainline `river`. Here, add it to the release instructions so that it gets properly tagged along with other all other modules. Also add it to the `update-submodule-versions` program. This isn't strictly necessary because `rivertype` doesn't have any submodule versions to update, but it feels like it should be there for completeness just in case, and also because it makes the list in the program a complete list of all Go modules in the project.
Previously, we implement a basic driver interface in River such that no
exported River APIs depended on
pgx/v5directly, with the idea beingthat although only one database driver was actually supported, it'd give
us the leeway we needed to fully implement a multi-driver system later.
Under the covers, the drivers were a hack, and
GetDBPooljust returnedthe wrapped pgx database pool back to the client and other packages that
wanted to access it.
This change takes the initial driver push and carries it all the way.
The driver API becomes fully fleshed out with all database operations
River needs to operate, and the top-level River module drops internal
use of
pgx/v5completely (it's still used in tests, but no non-testpackages).
If
pgx/v6were to be released tomorrow, this would allow us to addsupport for it quite easily. It also allows us to theoretically fully
implement the
databasesqldriver, or to implement drivers for otherdatabases like MySQL and SQLite (it'd be work to get that done, but
the driver API is generic enough to make it possible).
Notably, the
dbadapterpackage goes away because the River driver APIbecomes a very similar concept to it, with interface functions for major
operations. A difference is that given we'd like to implement multiple
drivers, the driver code should stay as lean as possible so that we
don't have to reimplement it over and over again, so any higher level
logic that could be extracted from
StandardAdapterwas, including:About half of the list logic was already in
dblist, but we move theother half too so that all list logic is now packaged together.
Uniqueness logic moves to a new
dbuniquepackage.dbadaptertests move into a new moduleriverdrivertestthat providessome helpers that easily allow us to run the full barrage against
riverpgxv5, but also any new drivers that we might add in the future.(
riverdatabasesqlis also tested, but uses a different helper thatonly checks its support for migrations.)
Possibly the trickiest incision was moving listen/notify logic over to
the driver, which picks up a new
GetListener()function that returnsthis
Listenerinterface:This is backed by a connection pool and
LISTENforriverpgxv5, butthe idea is that for databases that don't support listen/notify, it's
generic enough that it could be reimplemented as something else under
the hood, like say an unlogged table of some sort.
river/riverdriverbecomes its own Go module. Previously, we'd gottenaway without referencing it from driver implementations, but the
implementations now reference it for parameter types and some interfaces
and return values. Like most other modules, modules that need to
reference
riverdriverusereplacein theirgo.mods, so I don'texpect the extra module to add any extra headache to our releases.
Discerning readers may notice that driver implementations reference some
types in
riverdriverand other types inrivertypes. I'm definingwhich is which as:
riverdrivershould contain anything unstable thatwe're allowed to change and which a user shouldn't be accessing anyway
(e.g. say function parameter structs).
rivertypesis for specificconstructs that external River users will need to reference like
JobRow.Lastly, I'll say that this isn't final. I've refactored a lot of tests
to ensure that this patch doesn't have any major outstanding TODOs and
doesn't leave the code any worse that when it started, but there are a
lot of follow up improvements that I can potentially make, and expect
to.