MassTransit Transactional Outbox Table Schemas
MassTransit has a transactional outbox feature that integrates with entity framework so that your published messages are persisted in the same database transaction as your model changes.
If you use Entity Framework database migrations, this works out fine, it will add itself to your database context and migrate the database along with your own schema changes.
But if you’re like me and don’t trust code-first migrations, you need the raw SQL CREATE TABLE
commands.
You can get it by creating a dummy database migration and generating the SQL:
dotnet ef migrations add Dummy
dotnet ef migrations script
Or you can copy them from me:
SQL Server
https://gist.github.com/akatakritos/0608eb16094f42b6ed0ea9b8563d370a
-- https://github.com/MassTransit/MassTransit/discussions/4847
CREATE TABLE [InboxState] (
[Id] bigint NOT NULL IDENTITY,
[MessageId] uniqueidentifier NOT NULL,
[ConsumerId] uniqueidentifier NOT NULL,
[LockId] uniqueidentifier NOT NULL,
[RowVersion] rowversion NULL,
[Received] datetime2 NOT NULL,
[ReceiveCount] int NOT NULL,
[ExpirationTime] datetime2 NULL,
[Consumed] datetime2 NULL,
[Delivered] datetime2 NULL,
[LastSequenceNumber] bigint NULL,
CONSTRAINT [PK_InboxState] PRIMARY KEY ([Id]),
CONSTRAINT [AK_InboxState_MessageId_ConsumerId] UNIQUE ([MessageId], [ConsumerId])
);
CREATE TABLE [OutboxMessage] (
[SequenceNumber] bigint NOT NULL IDENTITY,
[EnqueueTime] datetime2 NULL,
[SentTime] datetime2 NOT NULL,
[Headers] nvarchar(max) NULL,
[Properties] nvarchar(max) NULL,
[InboxMessageId] uniqueidentifier NULL,
[InboxConsumerId] uniqueidentifier NULL,
[OutboxId] uniqueidentifier NULL,
[MessageId] uniqueidentifier NOT NULL,
[ContentType] nvarchar(256) NOT NULL,
[MessageType] nvarchar(max) NOT NULL,
[Body] nvarchar(max) NOT NULL,
[ConversationId] uniqueidentifier NULL,
[CorrelationId] uniqueidentifier NULL,
[InitiatorId] uniqueidentifier NULL,
[RequestId] uniqueidentifier NULL,
[SourceAddress] nvarchar(256) NULL,
[DestinationAddress] nvarchar(256) NULL,
[ResponseAddress] nvarchar(256) NULL,
[FaultAddress] nvarchar(256) NULL,
[ExpirationTime] datetime2 NULL,
CONSTRAINT [PK_OutboxMessage] PRIMARY KEY ([SequenceNumber])
);
CREATE TABLE [OutboxState] (
[OutboxId] uniqueidentifier NOT NULL,
[LockId] uniqueidentifier NOT NULL,
[RowVersion] rowversion NULL,
[Created] datetime2 NOT NULL,
[Delivered] datetime2 NULL,
[LastSequenceNumber] bigint NULL,
CONSTRAINT [PK_OutboxState] PRIMARY KEY ([OutboxId])
);
CREATE INDEX [IX_InboxState_Delivered] ON [InboxState] ([Delivered]);
CREATE INDEX [IX_OutboxMessage_EnqueueTime] ON [OutboxMessage] ([EnqueueTime]);
CREATE INDEX [IX_OutboxMessage_ExpirationTime] ON [OutboxMessage] ([ExpirationTime]);
CREATE UNIQUE INDEX [IX_OutboxMessage_InboxMessageId_InboxConsumerId_SequenceNumber] ON [OutboxMessage] ([InboxMessageId], [InboxConsumerId], [SequenceNumber]) WHERE [InboxMessageId] IS NOT NULL AND [InboxConsumerId] IS NOT NULL;
CREATE UNIQUE INDEX [IX_OutboxMessage_OutboxId_SequenceNumber] ON [OutboxMessage] ([OutboxId], [SequenceNumber]) WHERE [OutboxId] IS NOT NULL;
CREATE INDEX [IX_OutboxState_Created] ON [OutboxState] ([Created]);
Postgres
https://gist.github.com/akatakritos/18a05732cabec1ce2c82318249bac6c1
CREATE TABLE inbox_state
(
id bigint GENERATED BY DEFAULT AS IDENTITY,
message_id uuid NOT NULL,
consumer_id uuid NOT NULL,
lock_id uuid NOT NULL,
row_version bytea,
received timestamp with time zone NOT NULL,
receive_count integer NOT NULL,
expiration_time timestamp with time zone,
consumed timestamp with time zone,
delivered timestamp with time zone,
last_sequence_number bigint,
CONSTRAINT pk_inbox_state PRIMARY KEY (id),
CONSTRAINT ak_inbox_state_message_id_consumer_id UNIQUE (message_id, consumer_id)
);
CREATE TABLE outbox_message
(
sequence_number bigint GENERATED BY DEFAULT AS IDENTITY,
enqueue_time timestamp with time zone,
sent_time timestamp with time zone NOT NULL,
headers text,
properties text,
inbox_message_id uuid,
inbox_consumer_id uuid,
outbox_id uuid,
message_id uuid NOT NULL,
content_type character varying(256) NOT NULL,
message_type text NOT NULL,
body text NOT NULL,
conversation_id uuid,
correlation_id uuid,
initiator_id uuid,
request_id uuid,
source_address character varying(256),
destination_address character varying(256),
response_address character varying(256),
fault_address character varying(256),
expiration_time timestamp with time zone,
CONSTRAINT pk_outbox_message PRIMARY KEY (sequence_number)
);
CREATE TABLE outbox_state
(
outbox_id uuid NOT NULL,
lock_id uuid NOT NULL,
row_version bytea,
created timestamp with time zone NOT NULL,
delivered timestamp with time zone,
last_sequence_number bigint,
CONSTRAINT pk_outbox_state PRIMARY KEY (outbox_id)
);