Runtime view¶
arc42 §6.
Four scenarios cover the connector framework’s externally-observable
behaviour. Each :refines: the requirements that govern its
behaviour and the building blocks that implement it.
The send path is fully zero-copy on the sender’s side: the codec
writes directly into shared memory via
sequenceDiagram
autonumber
participant U as user code
participant W as ChannelWriter
participant P as Publisher (taktora-executor)
participant SHM as iceoryx2 SHM
participant S as Subscriber (gateway)
participant GI as OutboundGatewayItem
participant BR as Tokio bridge
participant MQ as rumqttc::AsyncClient
participant B as Broker
U->>W: writer.send(&value)
W->>P: publisher.loan(|slot| codec.encode(value, slot.payload))
P->>SHM: zero-copy publish + notify
SHM-->>S: WaitSet wakes
S->>GI: ExecutableItem::execute()
GI->>BR: bridge_tx.try_send(payload, routing)
BR-->>MQ: tokio task drains bridge
MQ->>B: client.publish(topic, qos, retained, payload)
B-->>MQ: PUBACK (QoS 1)
|
The receive path mirrors the send path. The gateway’s tokio task pushes incoming protocol-stack messages into an inbound bridge channel; the inbound gateway item resolves the channel by topic match (with wildcard demultiplexing) and re-publishes the envelope into the inbound iceoryx2 service.
sequenceDiagram
autonumber
participant B as Broker
participant MQ as rumqttc::EventLoop
participant BR as Tokio bridge
participant GI as InboundGatewayItem
participant P as Publisher (gateway, in svc)
participant SHM as iceoryx2 SHM
participant S as Subscriber (app)
participant R as ChannelReader
participant U as user code
B->>MQ: PUBLISH (topic, payload)
MQ-->>BR: tokio task pushes (topic, payload) into bridge_in
BR->>GI: taktora-executor Channel wakes item
GI->>GI: resolve channel by topic match (wildcard demux)
GI->>P: publisher.loan(|slot| copy payload, set header)
P->>SHM: zero-copy publish + notify
SHM-->>S: WaitSet wakes
S->>R: reader.try_recv() → Received{ value, header }
R->>U: user code consumes value
|
Every connector implements the same state machine. Concrete connectors may add private sub-states, but the externally-visible variants are exactly four.
stateDiagram-v2
[*] --> Connecting: gateway started
Connecting --> Up: protocol stack reports connected
Up --> Degraded: transient error (e.g. PUBACK timeout)
Degraded --> Up: recovery
Degraded --> Connecting: recovery episode begins
Connecting --> Degraded: recover attempt failed
Up --> Down: stack-level disconnect
Degraded --> Down: error threshold exceeded
Down --> Connecting: ReconnectPolicy backoff elapses
Connecting --> Down: connect attempt fails
Up --> [*]: shutdown
Down --> [*]: shutdown
Every transition emits a |
Shutdown is downstream of taktora-executor: when
sequenceDiagram
autonumber
participant SIG as SIGINT/SIGTERM
participant EX as taktora-executor WaitSet
participant HO as ConnectorHost / Gateway
participant GI as Gateway items
participant TT as Tokio runtime
participant B as Broker
SIG->>EX: signal delivered
EX->>EX: WaitSet returns Interrupt
EX->>HO: Executor::run() returns
HO->>GI: drop items
HO->>TT: shutdown_handle.send(())
TT->>B: client.disconnect() (best-effort)
B-->>TT: DISCONNECT ack (or timeout)
TT->>TT: tokio runtime drained
HO-->>SIG: process exits
|
Bring-up walks the four EtherCAT bus states. PDO mapping is applied during the PRE-OP → SAFE-OP transition — the only window where SDO writes to the sync-manager assignment indices land on a stable mailbox but the cyclic process image is not yet live. Plugin traffic is accepted only after the bus reaches OP.
sequenceDiagram
autonumber
participant HO as ConnectorHost
participant GW as EthercatGateway
participant EC as ethercrab MainDevice
participant SD as SubDevices
participant PL as Plugin (EthercatConnector)
HO->>GW: start gateway
GW->>EC: PduStorage.try_split + MainDevice::new
GW->>EC: spawn tx_rx_task on tokio sidecar
GW->>EC: init_single_group (INIT to PRE-OP)
EC->>SD: discover and address SubDevices
GW->>SD: SDO writes 0x1C12 / 0x1C13 (PDO mapping)
GW->>EC: group.into_safe_op (PRE-OP to SAFE-OP)
GW->>EC: group.into_op (SAFE-OP to OP)
GW-->>HO: ConnectorHealth::Up
PL->>GW: writer.send / reader.try_recv accepted
|
The gateway runs one
stateDiagram-v2
[*] --> Connecting: gateway started
Connecting --> Up: bus reaches OP and WKC matches
Up --> Degraded: WKC below expected on N consecutive cycles
Degraded --> Up: WKC restored
Up --> Down: bus drops below OP
Degraded --> Down: WKC remains below expected past timeout
Down --> Connecting: ReconnectPolicy backoff elapses
Connecting --> Down: bring-up attempt fails
Up --> [*]: shutdown
Down --> [*]: shutdown
|
When
sequenceDiagram
autonumber
participant GW as EthercatGateway
participant EC as ethercrab MainDevice
participant SD as SubDevices
GW->>EC: read SupportFlags (0x0008) per SubDevice
EC->>SD: BRD 0x0008
SD-->>EC: 64-bit DC support flags
GW->>EC: latch port-0 receive times
EC->>SD: BWR 0x0900 (system time)
loop per DC-capable SubDevice
EC->>SD: read 0x0918 (receive time processing unit)
SD-->>EC: t_recv
EC->>SD: write 0x0920 (system time offset = t_sys - t_recv)
end
GW->>EC: propagate reference clock
EC->>SD: FRMW 0x0910 (first SubDevice clock)
|
The multi-reply request/response flow for Zenoh queries, derived
from Zenoh query handles (sub-bl... (BB_0043) and Reply framing uses a Zenoh-... (ADR_0043).
sequenceDiagram
autonumber
participant QU as user code (querier side)
participant ZQ as ZenohQuerier
participant SHM as iceoryx2 SHM
participant GW as ZenohGateway
participant ZN as zenoh::Session
participant RGW as ZenohGateway (responder)
participant ZA as ZenohQueryable
participant RU as user code (queryable side)
QU->>ZQ: send(q) → mints QueryId
ZQ->>SHM: publish {name}.query.out (correlation_id = QueryId, codec(q))
SHM-->>GW: WaitSet wakes
GW->>ZN: session.get(key_expr, payload)
ZN->>RGW: query delivered to queryable
RGW->>SHM: publish {name}.query.in (QueryId, q bytes)
SHM-->>ZA: ZenohQueryable.try_recv() → (QueryId, q)
ZA->>RU: user handles request, produces R chunks
loop per reply chunk
RU->>ZA: reply(id, r)
ZA->>SHM: publish {name}.reply.out (payload[0]=0x01, codec(r))
SHM-->>RGW: gateway forwards chunk
RGW->>ZN: query.reply(sample)
ZN->>GW: reply sample arrives on querier session
GW->>SHM: publish {name}.reply.in (payload[0]=0x01, r bytes)
SHM-->>ZQ: ZenohQuerier.try_recv() → data chunk
ZQ->>QU: decode(R) chunk delivered
end
RU->>ZA: terminate(id)
ZA->>SHM: publish {name}.reply.out (payload[0]=0x02 end-of-stream)
SHM-->>RGW: gateway drops zenoh::Query handle
GW->>SHM: publish {name}.reply.in (payload[0]=0x02 end-of-stream)
SHM-->>ZQ: try_recv() → end-of-stream
ZQ->>QU: stream complete
|
The CAN send path is the framework’s zero-copy publish path
(Send path (app → broker) (ARCH_0010)) terminated by a SocketCAN write. The codec
writes envelope payload bytes directly into shared memory via
sequenceDiagram
autonumber
participant U as user code
participant W as ChannelWriter
participant P as Publisher (taktora-executor)
participant SHM as iceoryx2 SHM
participant S as Subscriber (gateway)
participant GI as OutboundGatewayItem
participant BR as Tokio bridge (per-iface outbound)
participant SK as socketcan socket (iface)
participant BUS as CAN bus
U->>W: writer.send(&value)
W->>P: publisher.loan(|slot| codec.encode(value, slot.payload))
P->>SHM: zero-copy publish + notify
SHM-->>S: WaitSet wakes
S->>GI: ExecutableItem::execute()
GI->>BR: bridge_tx.try_send(payload, routing)
BR-->>SK: tokio task drains bridge
SK->>SK: build CanFrame{id, ext, data} or CanFdFrame{id, brs, esi, data}
SK->>BUS: write_frame()
|
The CAN receive path applies the per-interface kernel filter
(compiled by Per-iface filter compiler (... (BB_0074) from the union of registered
readers’
sequenceDiagram
autonumber
participant BUS as CAN bus
participant SK as socketcan socket (iface)
participant RX as RX task (tokio)
participant FC as filter / demux (BB_0074)
participant HC as error classifier (BB_0072)
participant BR as Tokio bridge (per-iface inbound)
participant GI as InboundGatewayItem
participant P as Publisher (gateway, in svc)
participant SHM as iceoryx2 SHM
participant S as Subscriber (app)
participant R as ChannelReader
participant U as user code
BUS->>SK: arbitration + ACK, data frame matches kernel filter
SK->>RX: read_frame() → CanFrame | CanFdFrame | error frame
alt error frame
RX->>HC: classify (passive / warning / bus-off)
HC->>HC: update per-iface sub-state and aggregate via worst-of
else data frame
RX->>FC: lookup matching readers by (can_id, mask, ext)
loop for each matching reader
FC->>BR: enqueue (descriptor, data)
end
BR->>GI: taktora-executor Channel wakes item
GI->>P: publisher.loan(|slot| copy payload bytes, set header)
P->>SHM: zero-copy publish + notify
SHM-->>S: WaitSet wakes
S->>R: reader.try_recv() → Received{ value, header }
R->>U: user code consumes value
end
|
Per-interface sub-state machine driven by error-frame
classification; aggregated into the connector’s single visible
stateDiagram-v2
[*] --> Connecting: iface socket bound, filter applied
Connecting --> Up: first successful read or successful send
Up --> Degraded: error-passive / error-warning frame received
Degraded --> Up: no further error frames for recovery window
Up --> Down: bus-off error frame received
Degraded --> Down: bus-off escalation
Down --> Connecting: ReconnectPolicy backoff elapses, socket reopened, filter re-applied
Connecting --> Down: socket reopen / filter apply fails
Up --> [*]: shutdown
Down --> [*]: shutdown
Aggregation rule (ConnectorHealth aggregates ... (REQ_0630)): any iface |