Postgres Workqueue

В этом посте речь пойдёт про организацию очереди отложенных задач в базе данных. На практике выяснилось, что очень большое количество разработчиков эту идею принимают с большим трудом, сразу же выражают обеспокоенность искусственно созданной нагрузкой. С одной стороны, это грамотное замечание. С другой — это тот барьер, через который надо научиться переступать, наблюдать за вашим приложением и грамотно настраивать базу.

Что мы не будем рассматривать? Мы не будем ничего говорить про оптимизацию и партицирование. Это та часть задачи, которая будет зависеть от профиля нагрузки на ваше приложение.

Итак, задача:

  • Организовать фоновую обработку поступающих задач.
  • Каждая задача должна быть выполнена одним и только одним обработчиком.
  • Допускается повторное взятие задачи из очереди после определенного таймаута.

Задачи на исполнение мы будем хранить в таблице workqueue. Одна строка — одна задача:

create table workqueue
(
  id          bigserial primary key,
  payload     jsonb       not null,
  enqueued_at timestamptz not null default now(),
  locked_at   timestamptz,
  locked_by   text
)

Поля locked_at и locked_by обновляются в момент взятия задачи в работу. В качестве значения поля locked_by удобно использовать адрес хоста или имя пода в k8s.

В поле payload хранится вся, необходимя для выполнения задачи информация.

Как будет выглядеть процесс постановки задачи в очередь? Очень просто:

insert into workqueue (payload)
values ('{"foo": "bar"}')

Теперь поговорим о получении следующего экземпляра задачи из очереди. Мы условились, что у задачи есть время, за которое она должна быть выполнена. Сама задача должна быть выполнена только в одном экземпляре. То есть в один момент времени над выполнением задачи работает только одни обработчик.

with task as (select id
              from workqueue
              where locked_by is null
                 or locked_at < now() - :retry_timeout
              order by id for update skip locked
              limit 1)
update workqueue q
set locked_by = :worker_name,
    locked_at = now()
from task
where task.id = q.id
returning q.id, q.payload

Как только задача будет выполнена, её следует удалить из таблицы очереди:

delete
from workqueue
where id = :id

Для иллюстрации работы очереди этого достаточно. Мы же в реальных условиях перекладываем задачу в момент удаления в архивную таблицу.

На этом, собственно, всё. Очередь задач на базе — довольно простая вещь. Соблюдайте паттерн:

  • Поставил задачу
  • Обработал задачу
  • Удалил задачу

Подход с очередью применяется достаточно часто. В нём нет ничего страшного. По опыту, грамотно настроенный postgres вывозит довольно большие нагрузки, нет причин переживать за “искусственность” нагрузки. Приведу цифры: 2k TPS не влияют на производительность, но могут быть ощутимы на вашем железе и сетапе. Применяйте подход с умом.

Ссылки