factos/factos_pog
PostgreSQL backend for Factos using the pog package.
This backend stores accepted facts in an append-only factos_events table.
The event history is the source of truth; projections and stream-shaped reads
are derived views over that history.
The context dispatch flow follows the Command Context Consistency idea from “Simply Event Sourcing”: a command selects the facts required for its decision, folds them into temporary state, decides new facts, and appends those facts only when no relevant facts appeared after the observed context position.
The query contract is intentionally tag-based. PostgreSQL stores opaque event bytes, an event type, and tags. This keeps domain serialization outside the backend, but it means any payload value needed for a selective consistency query must be written as a tag.
Types
pub type Append {
Append(
current_revision: Int,
position: factos.SequencePosition,
)
}
Constructors
-
Append(current_revision: Int, position: factos.SequencePosition)Result of a successful append.
current_revisionis the latest revision of the target stream after the append.positionis the global position of the last inserted event, orNoPositionwhen no events were produced.
pub type DecodeError {
UnknownEvent
InvalidData
}
Constructors
-
UnknownEvent -
InvalidData
pub type Dispatch(event) {
Dispatch(append: Append, events: List(factos.Recorded(event)))
}
Constructors
-
Dispatch(append: Append, events: List(factos.Recorded(event)))Result of a successful dispatch.
appendhas the stream revision and final global position.eventsare the committed events recorded by this dispatch, suitable for pure Factos reactors or backend-specific durable effect adapters.
pub type Error(domain_error) {
DomainError(domain_error)
StoreError(pog.QueryError)
AppendConditionFailed(factos.AppendCondition)
DecodeError(DecodeError)
}
Constructors
-
DomainError(domain_error)The decider rejected the command with a domain error.
-
StoreError(pog.QueryError)PostgreSQL or
pogreturned an error while running a query. -
AppendConditionFailed(factos.AppendCondition)A stream revision or context append condition failed.
-
DecodeError(DecodeError)
pub type EventCodec(event) {
EventCodec(
encode: fn(event) -> Proposed(event),
decode: fn(StoredEvent) -> Result(
factos.Decoded(event),
DecodeError,
),
)
}
Constructors
-
EventCodec( encode: fn(event) -> Proposed(event), decode: fn(StoredEvent) -> Result( factos.Decoded(event), DecodeError, ), )Application-owned PostgreSQL event codec.
encodeconverts a domain event into bytes and metadata.decodeconverts a stored row back into afactos.Decodeddomain event. Decode failures are kept in the application’s own error type and wrapped asDecodeError.
pub type Proposed(event) {
Proposed(
id: String,
event: event,
type_: factos.EventType,
version: Int,
tags: List(factos.Tag),
metadata: factos.Metadata,
data: BitArray,
)
}
Constructors
-
Proposed( id: String, event: event, type_: factos.EventType, version: Int, tags: List(factos.Tag), metadata: factos.Metadata, data: BitArray, )A domain event prepared for PostgreSQL persistence.
The application codec creates this value.
idshould identify the event for the application.type_andtagsare store-visible query metadata.datais opaque bytes owned by the application codec.
pub type StoredEvent {
StoredEvent(
position: Int,
id: String,
stream: String,
revision: Int,
type_: factos.EventType,
version: Int,
tags: List(factos.Tag),
metadata: factos.Metadata,
data: BitArray,
)
}
Constructors
-
StoredEvent( position: Int, id: String, stream: String, revision: Int, type_: factos.EventType, version: Int, tags: List(factos.Tag), metadata: factos.Metadata, data: BitArray, )A raw event row read from PostgreSQL before domain decoding.
Decoders receive this value so they can inspect stored metadata and bytes.
positionis the global append order.revisionis the per-stream revision.
Values
pub fn codec(
encode encode: fn(event) -> Proposed(event),
decode decode: fn(StoredEvent) -> Result(
factos.Decoded(event),
DecodeError,
),
) -> EventCodec(event)
Create a new codec.
encode turns a domain event into a Proposed event ready for persistence.
decode turns a stored row back into a domain event. Decode failures are
returned as DecodeError and stop load/read flows rather than panicking.
pub fn dispatch(
connection: pog.Connection,
stream stream_name: String,
decider decider: factos.Decider(
command,
state,
event,
domain_error,
),
codec codec: EventCodec(event),
command command: command,
) -> Result(Dispatch(event), Error(domain_error))
Run a stream-based read-decide-append command flow.
Use this when one stream is intentionally the consistency boundary. It remains
useful, but it is not required by Event Sourcing. For command-specific rules,
prefer dispatch_with_query so the protected boundary follows the decision.
pub fn dispatch_with_query(
connection: pog.Connection,
stream stream_name: String,
query query: factos.Query,
decider decider: factos.Decider(
command,
state,
event,
domain_error,
),
codec codec: EventCodec(event),
command command: command,
) -> Result(Dispatch(event), Error(domain_error))
Run a full context-first read-decide-append command flow.
PostgreSQL does not have a native primitive for “append if no row matching this
arbitrary event-type/tag query appeared after position N”. This backend uses a
transaction plus lock table factos_events in exclusive mode to make the read,
context check, and append atomic for all Factos queries.
That lock is the main throughput tradeoff: unrelated writers queue behind each other even if their contexts do not overlap. It is deliberately simple and correct. A future backend can use advisory locks or query-specific lock keys, but only if it keeps the same context-stability guarantee.
pub fn error_to_string(
error: Error(domain_error),
domain_error_to_string: fn(domain_error) -> String,
) -> String
pub fn load_stream(
connection: pog.Connection,
stream stream_name: String,
decider decider: factos.Decider(
command,
state,
event,
domain_error,
),
codec codec: EventCodec(event),
) -> Result(
factos.LoadedStream(event, state),
Error(domain_error),
)
Load and fold one stream.
This supports classic stream-revision consistency. The returned
factos.LoadedStream contains the folded state, decoded recorded events, and
current stream revision.
pub fn migrate(
connection: pog.Connection,
) -> Result(Nil, Error(domain_error))
The schema is an append-only factos_events table with a global identity
position, per-stream revision, event type, newline-encoded tags, and
opaque data bytes. Queryable tags are also mirrored into
factos_event_tags, which gives event-type/tag context reads indexed SQL
plans instead of loading the whole event table into the application.
pub fn read_context(
connection: pog.Connection,
query query: factos.Query,
decider decider: factos.Decider(
command,
state,
event,
domain_error,
),
codec codec: EventCodec(event),
) -> Result(factos.Context(event, state), Error(domain_error))
Read and fold the facts selected by a command-context query.
The backend reads stored rows, decodes them with the supplied codec, filters
them with factos.matches_query, folds matching events with the decider’s
evolve function, and returns a factos.Context with a
FailIfEventsMatch(query, after) append condition.