Enhanced SSE¶
relay provides three ways to consume a Server-Sent Events stream, each suited to a different use case:
| Method | Use when |
|---|---|
ExecuteSSE | Simple streaming with a callback, no reconnect |
ExecuteSSEWithReconnect | Long-lived streams that must survive disconnects |
ExecuteSSEStream | Channel-based consumption, fan-out, or select loops |
SSEEvent¶
type SSEEvent struct {
ID string
Event string // defaults to "message" when absent
Data string
Retry int // reconnect hint in milliseconds; 0 when absent
}
Each SSEEvent corresponds to one fully-parsed event block from the stream. Multi-line data fields are joined with a newline.
SSEClientConfig¶
SSEClientConfig controls the reconnect behaviour of ExecuteSSEWithReconnect.
type SSEClientConfig struct {
// MaxReconnects is the maximum number of reconnect attempts.
// 0 means unlimited.
MaxReconnects int
// ReconnectDelay is the base delay between reconnects.
// If the server sends a "retry" field, that takes precedence.
// Defaults to 3s when zero.
ReconnectDelay time.Duration
// EventTypes filters which event types are delivered to the handler.
// An empty slice delivers all events.
EventTypes []string
}
ExecuteSSEWithReconnect¶
func (c *Client) ExecuteSSEWithReconnect(
req *Request,
cfg SSEClientConfig,
handler SSEHandler,
) error
ExecuteSSEWithReconnect opens the SSE stream and automatically reconnects on disconnect. On each reconnect it sends the Last-Event-ID header with the last received event ID, allowing the server to resume from where it left off.
Parameters:
req-- the request to send. Typically aGETwithAccept: text/event-stream.cfg-- reconnect settings (delay, max attempts, event-type filter).handler-- callback invoked for each event; returnfalseto stop the stream.
Reconnect logic:
- Send the request (with
Last-Event-IDon subsequent attempts). - Read events until the stream closes or the handler returns
false. - If the stream closed normally, reconnect after
cfg.ReconnectDelay(or the server-suppliedretryvalue). - Stop after
cfg.MaxReconnectsattempts (or never, ifMaxReconnectsis 0).
Basic usage¶
package main
import (
"fmt"
"log"
"time"
"github.com/jhonsferg/relay"
)
func main() {
client, err := relay.New(
relay.WithBaseURL("https://api.example.com"),
)
if err != nil {
log.Fatal(err)
}
cfg := relay.SSEClientConfig{
MaxReconnects: 5,
ReconnectDelay: 2 * time.Second,
}
err = client.ExecuteSSEWithReconnect(
client.NewRequest("GET", "/events").
WithHeader("Accept", "text/event-stream"),
cfg,
func(ev relay.SSEEvent) bool {
fmt.Printf("[%s] %s\n", ev.Event, ev.Data)
return true // continue
},
)
if err != nil {
log.Fatal(err)
}
}
Event-type filtering¶
Pass a list of event types to EventTypes. Only events whose event field matches one of the listed types are delivered to the handler; all others are silently discarded.
package main
import (
"fmt"
"log"
"time"
"github.com/jhonsferg/relay"
)
func main() {
client, err := relay.New(
relay.WithBaseURL("https://notifications.example.com"),
)
if err != nil {
log.Fatal(err)
}
cfg := relay.SSEClientConfig{
MaxReconnects: 10,
ReconnectDelay: 3 * time.Second,
// Only receive "order-update" and "payment-confirmed" events
EventTypes: []string{"order-update", "payment-confirmed"},
}
err = client.ExecuteSSEWithReconnect(
client.NewRequest("GET", "/stream").
WithHeader("Accept", "text/event-stream"),
cfg,
func(ev relay.SSEEvent) bool {
switch ev.Event {
case "order-update":
fmt.Println("order updated:", ev.Data)
case "payment-confirmed":
fmt.Println("payment confirmed:", ev.Data)
}
return true
},
)
if err != nil {
log.Fatal(err)
}
}
Stopping early¶
Return false from the handler to close the connection immediately without reconnecting.
package main
import (
"fmt"
"log"
"github.com/jhonsferg/relay"
)
func main() {
client, err := relay.New(
relay.WithBaseURL("https://api.example.com"),
)
if err != nil {
log.Fatal(err)
}
received := 0
err = client.ExecuteSSEWithReconnect(
client.NewRequest("GET", "/events").
WithHeader("Accept", "text/event-stream"),
relay.SSEClientConfig{},
func(ev relay.SSEEvent) bool {
fmt.Println(ev.Data)
received++
return received < 5 // stop after 5 events
},
)
if err != nil {
log.Fatal(err)
}
}
ExecuteSSEStream¶
func (c *Client) ExecuteSSEStream(
ctx context.Context,
req *Request,
) (<-chan SSEEvent, <-chan error)
ExecuteSSEStream returns two channels: one for events and one for errors. The stream runs in a background goroutine. Cancel ctx to stop it.
Both channels are closed when the stream ends (normally or on error). The caller must read from both channels until they are closed, or use a select loop that also listens on the context.
Basic channel consumption¶
package main
import (
"context"
"fmt"
"log"
"github.com/jhonsferg/relay"
)
func main() {
client, err := relay.New(
relay.WithBaseURL("https://api.example.com"),
)
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
events, errs := client.ExecuteSSEStream(
ctx,
client.NewRequest("GET", "/events").
WithHeader("Accept", "text/event-stream"),
)
for {
select {
case ev, ok := <-events:
if !ok {
return // stream closed
}
fmt.Printf("[%s] %s\n", ev.Event, ev.Data)
case err, ok := <-errs:
if !ok {
return
}
log.Printf("stream error: %v", err)
return
}
}
}
Fan-out to multiple consumers¶
Because ExecuteSSEStream returns plain Go channels, you can fan out to multiple consumers using a dispatcher goroutine.
package main
import (
"context"
"fmt"
"log"
"github.com/jhonsferg/relay"
)
func fanOut(in <-chan relay.SSEEvent, n int) []<-chan relay.SSEEvent {
outs := make([]chan relay.SSEEvent, n)
for i := range outs {
outs[i] = make(chan relay.SSEEvent, 16)
}
go func() {
for ev := range in {
for _, out := range outs {
out <- ev
}
}
for _, out := range outs {
close(out)
}
}()
result := make([]<-chan relay.SSEEvent, n)
for i, ch := range outs {
result[i] = ch
}
return result
}
func main() {
client, err := relay.New(
relay.WithBaseURL("https://api.example.com"),
)
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
events, errs := client.ExecuteSSEStream(
ctx,
client.NewRequest("GET", "/events").
WithHeader("Accept", "text/event-stream"),
)
consumers := fanOut(events, 3)
go func() {
for err := range errs {
log.Printf("stream error: %v", err)
}
}()
// Each consumer processes events independently
for i, ch := range consumers {
i := i
go func(c <-chan relay.SSEEvent) {
for ev := range c {
fmt.Printf("consumer %d: %s\n", i, ev.Data)
}
}(ch)
}
// Block until context cancelled
<-ctx.Done()
}
note
ExecuteSSEStreamdoes not reconnect automatically. Combine it with your own retry loop, or useExecuteSSEWithReconnectwhen reconnection is needed.
Choosing the Right Method¶
| Requirement | Method |
|---|---|
| Simple one-shot stream with callback | ExecuteSSE |
| Durable stream with auto-reconnect | ExecuteSSEWithReconnect |
Channel-based or select-driven consumption | ExecuteSSEStream |
| Fan-out to multiple consumers | ExecuteSSEStream + dispatcher goroutine |
| Filter by event type | ExecuteSSEWithReconnect with SSEClientConfig.EventTypes |
Last-Event-ID Resumption¶
ExecuteSSEWithReconnect tracks the id field of the last received event. On every reconnect it sends this value in the Last-Event-ID request header. A well-behaved SSE server uses this header to replay missed events, providing at-least-once delivery across reconnects.