Postgres Workqueue
В этом посте речь пойдёт про организацию очереди отложенных задач в базе данных. На практике выяснилось, что очень большое количество разработчиков эту идею принимают с большим трудом, сразу же выражают обеспокоенность искусственно созданной нагрузкой. С одной стороны, это грамотное замечание. С другой — это тот барьер, через который надо научиться переступать, наблюдать за вашим приложением и грамотно настраивать базу.
Что мы не будем рассматривать? Мы не будем ничего говорить про оптимизацию и партицирование. Это та часть задачи, которая будет зависеть от профиля нагрузки на ваше приложение.
Итак, задача:
- Организовать фоновую обработку поступающих задач.
- Каждая задача должна быть выполнена одним и только одним обработчиком.
- Допускается повторное взятие задачи из очереди после определенного таймаута.
Задачи на исполнение мы будем хранить в таблице workqueue
. Одна строка — одна
задача:
create table workqueue
(
id bigserial primary key,
payload jsonb not null,
enqueued_at timestamptz not null default now(),
locked_at timestamptz,
locked_by text
)
Поля locked_at
и locked_by
обновляются в момент взятия задачи в работу. В
качестве значения поля locked_by
удобно использовать адрес хоста или имя пода
в k8s
.
В поле payload
хранится вся, необходимя для выполнения задачи информация.
Как будет выглядеть процесс постановки задачи в очередь? Очень просто:
insert into workqueue (payload)
values ('{"foo": "bar"}')
Теперь поговорим о получении следующего экземпляра задачи из очереди. Мы условились, что у задачи есть время, за которое она должна быть выполнена. Сама задача должна быть выполнена только в одном экземпляре. То есть в один момент времени над выполнением задачи работает только одни обработчик.
with task as (select id
from workqueue
where locked_by is null
or locked_at < now() - :retry_timeout
order by id for update skip locked
limit 1)
update workqueue q
set locked_by = :worker_name,
locked_at = now()
from task
where task.id = q.id
returning q.id, q.payload
Как только задача будет выполнена, её следует удалить из таблицы очереди:
delete
from workqueue
where id = :id
Для иллюстрации работы очереди этого достаточно. Мы же в реальных условиях перекладываем задачу в момент удаления в архивную таблицу.
На этом, собственно, всё. Очередь задач на базе — довольно простая вещь. Соблюдайте паттерн:
- Поставил задачу
- Обработал задачу
- Удалил задачу
Подход с очередью применяется достаточно часто. В нём нет ничего страшного. По
опыту, грамотно настроенный postgres
вывозит довольно большие нагрузки, нет
причин переживать за “искусственность” нагрузки. Приведу цифры: 2k TPS не влияют
на производительность, но могут быть ощутимы на вашем железе и сетапе.
Применяйте подход с умом.