MassTransit Transactional Outbox Table Schemas

MassTransit has a transactional outbox feature that integrates with entity framework so that your published messages are persisted in the same database transaction as your model changes.

If you use Entity Framework database migrations, this works out fine, it will add itself to your database context and migrate the database along with your own schema changes.

But if you’re like me and don’t trust code-first migrations, you need the raw SQL CREATE TABLE commands.

You can get it by creating a dummy database migration and generating the SQL:

dotnet ef migrations add Dummy
dotnet ef migrations script

Or you can copy them from me:

SQL Server

https://gist.github.com/akatakritos/0608eb16094f42b6ed0ea9b8563d370a

-- https://github.com/MassTransit/MassTransit/discussions/4847
CREATE TABLE [InboxState] (
    [Id] bigint NOT NULL IDENTITY,
    [MessageId] uniqueidentifier NOT NULL,
    [ConsumerId] uniqueidentifier NOT NULL,
    [LockId] uniqueidentifier NOT NULL,
    [RowVersion] rowversion NULL,
    [Received] datetime2 NOT NULL,
    [ReceiveCount] int NOT NULL,
    [ExpirationTime] datetime2 NULL,
    [Consumed] datetime2 NULL,
    [Delivered] datetime2 NULL,
    [LastSequenceNumber] bigint NULL,
     CONSTRAINT [PK_InboxState] PRIMARY KEY ([Id]),
    CONSTRAINT [AK_InboxState_MessageId_ConsumerId] UNIQUE ([MessageId], [ConsumerId])
    );

CREATE TABLE [OutboxMessage] (
    [SequenceNumber] bigint NOT NULL IDENTITY,
    [EnqueueTime] datetime2 NULL,
    [SentTime] datetime2 NOT NULL,
    [Headers] nvarchar(max) NULL,
    [Properties] nvarchar(max) NULL,
    [InboxMessageId] uniqueidentifier NULL,
    [InboxConsumerId] uniqueidentifier NULL,
    [OutboxId] uniqueidentifier NULL,
    [MessageId] uniqueidentifier NOT NULL,
    [ContentType] nvarchar(256) NOT NULL,
    [MessageType] nvarchar(max) NOT NULL,
    [Body] nvarchar(max) NOT NULL,
    [ConversationId] uniqueidentifier NULL,
    [CorrelationId] uniqueidentifier NULL,
    [InitiatorId] uniqueidentifier NULL,
    [RequestId] uniqueidentifier NULL,
    [SourceAddress] nvarchar(256) NULL,
    [DestinationAddress] nvarchar(256) NULL,
    [ResponseAddress] nvarchar(256) NULL,
    [FaultAddress] nvarchar(256) NULL,
    [ExpirationTime] datetime2 NULL,
    CONSTRAINT [PK_OutboxMessage] PRIMARY KEY ([SequenceNumber])
    );

CREATE TABLE [OutboxState] (
    [OutboxId] uniqueidentifier NOT NULL,
    [LockId] uniqueidentifier NOT NULL,
    [RowVersion] rowversion NULL,
    [Created] datetime2 NOT NULL,
    [Delivered] datetime2 NULL,
    [LastSequenceNumber] bigint NULL,
     CONSTRAINT [PK_OutboxState] PRIMARY KEY ([OutboxId])
    );

CREATE INDEX [IX_InboxState_Delivered] ON [InboxState] ([Delivered]);

CREATE INDEX [IX_OutboxMessage_EnqueueTime] ON [OutboxMessage] ([EnqueueTime]);

CREATE INDEX [IX_OutboxMessage_ExpirationTime] ON [OutboxMessage] ([ExpirationTime]);

CREATE UNIQUE INDEX [IX_OutboxMessage_InboxMessageId_InboxConsumerId_SequenceNumber] ON [OutboxMessage] ([InboxMessageId], [InboxConsumerId], [SequenceNumber]) WHERE [InboxMessageId] IS NOT NULL AND [InboxConsumerId] IS NOT NULL;

CREATE UNIQUE INDEX [IX_OutboxMessage_OutboxId_SequenceNumber] ON [OutboxMessage] ([OutboxId], [SequenceNumber]) WHERE [OutboxId] IS NOT NULL;

CREATE INDEX [IX_OutboxState_Created] ON [OutboxState] ([Created]);

Postgres

https://gist.github.com/akatakritos/18a05732cabec1ce2c82318249bac6c1

CREATE TABLE inbox_state
(
    id                   bigint GENERATED BY DEFAULT AS IDENTITY,
    message_id           uuid                     NOT NULL,
    consumer_id          uuid                     NOT NULL,
    lock_id              uuid                     NOT NULL,
    row_version          bytea,
    received             timestamp with time zone NOT NULL,
    receive_count        integer                  NOT NULL,
    expiration_time      timestamp with time zone,
    consumed             timestamp with time zone,
    delivered            timestamp with time zone,
    last_sequence_number bigint,
    CONSTRAINT pk_inbox_state PRIMARY KEY (id),
    CONSTRAINT ak_inbox_state_message_id_consumer_id UNIQUE (message_id, consumer_id)
);


CREATE TABLE outbox_message
(
    sequence_number     bigint GENERATED BY DEFAULT AS IDENTITY,
    enqueue_time        timestamp with time zone,
    sent_time           timestamp with time zone NOT NULL,
    headers             text,
    properties          text,
    inbox_message_id    uuid,
    inbox_consumer_id   uuid,
    outbox_id           uuid,
    message_id          uuid                     NOT NULL,
    content_type        character varying(256)   NOT NULL,
    message_type        text                     NOT NULL,
    body                text                     NOT NULL,
    conversation_id     uuid,
    correlation_id      uuid,
    initiator_id        uuid,
    request_id          uuid,
    source_address      character varying(256),
    destination_address character varying(256),
    response_address    character varying(256),
    fault_address       character varying(256),
    expiration_time     timestamp with time zone,
    CONSTRAINT pk_outbox_message PRIMARY KEY (sequence_number)
);

CREATE TABLE outbox_state
(
    outbox_id            uuid                     NOT NULL,
    lock_id              uuid                     NOT NULL,
    row_version          bytea,
    created              timestamp with time zone NOT NULL,
    delivered            timestamp with time zone,
    last_sequence_number bigint,
    CONSTRAINT pk_outbox_state PRIMARY KEY (outbox_id)
);