Class: ConvertSdk::VisitorsQueue Private
- Inherits:
-
Object
- Object
- ConvertSdk::VisitorsQueue
- Defined in:
- lib/convert_sdk/visitors_queue.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
The per-visitor event queue — the in-memory buffer between the decision flow (which enqueues bucketing/conversion events) and ApiManager (which drains and POSTs them in the Convert wire format).
Per-visitor merge (structural invariant, FR36)
The queue holds ONE entry per visitor — a string-keyed wire-shaped hash
{"visitorId" => id, "segments" => {...}?, "events" => [...]}. Enqueuing an
event for a visitor already in the queue APPENDS to that visitor's events
array; it never adds a duplicate visitor entry and never flattens to a bare
event list. The platform attributes events by walking visitors[].events, so
flattening or duplicating corrupts report attribution. The structure itself
enforces the invariant — there is no public path that bypasses the merge.
(JS parity: api-manager.ts:117-144; PHP VisitorsQueue.php:64-70.)
segments ride on the visitor entry and are captured ONLY when the entry is
first created (omitted entirely when none are supplied) — a later enqueue for
the same visitor never overwrites them (JS if (segments) visitor.segments = …).
Bounded memory (FR39/NFR10)
The queue is bounded at MAX_EVENTS EVENTS (events, not visitors). On
overflow the OLDEST event is dropped — and the visitor entry is removed once
its last event is gone — with a warn log per drop. An endpoint outage can
never grow host memory without bound; dropping the oldest (not the newest)
keeps the most recent traffic. (Optimizely DEFAULT_QUEUE_CAPACITY = 1000
precedent; research frozen register #7.)
Thread safety (NFR2/NFR13)
Every operation is serialized by @queue_mutex. #enqueue is pure in-memory
and never blocks on I/O, so the calling request thread is never held on the
network. #drain! is an atomic drain-and-swap inside the lock returning the
drained visitors array — ApiManager builds the payload and POSTs OUTSIDE the
lock, so network I/O never holds the queue. The drained array is re-enqueueable
without violating the per-visitor merge (the retention path Story 4.2 needs).
Constant Summary collapse
- MAX_EVENTS =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
The hard upper bound on buffered events (events, not visitors). Research frozen register #7; the JS SDK has no equivalent memory cap.
1000
Instance Method Summary collapse
-
#clear ⇒ void
private
Atomically empty the queue WITHOUT returning the entries (Story 4.4 child queue-ownership clear).
-
#drain! ⇒ Array<Hash{String=>Object}>
private
Atomically drain the queue: swap out the current per-visitor entries and reset to empty inside the lock, returning the drained array.
-
#enqueue(visitor_id, event, segments: nil) ⇒ void
private
Enqueue one wire-shaped event for
visitor_id, merging into the visitor's existing entry (append) or creating a new one. -
#initialize(log_manager:) ⇒ VisitorsQueue
constructor
private
A new instance of VisitorsQueue.
-
#requeue(visitors) ⇒ void
private
Re-enqueue previously drained per-visitor entries after a failed delivery (Story 4.2 failure retention), PRESERVING the per-visitor merge.
-
#size ⇒ Integer
private
The total number of buffered EVENTS (not visitors).
Constructor Details
#initialize(log_manager:) ⇒ VisitorsQueue
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of VisitorsQueue.
48 49 50 51 52 53 54 55 |
# File 'lib/convert_sdk/visitors_queue.rb', line 48 def initialize(log_manager:) @log_manager = log_manager # Thread safety: guarded by @queue_mutex. @items is the ordered list of # per-visitor entries; @size is the total event count (the cap dimension). @queue_mutex = Thread::Mutex.new @items = [] #: Array[Hash[String, untyped]] @size = 0 end |
Instance Method Details
#clear ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Atomically empty the queue WITHOUT returning the entries (Story 4.4 child queue-ownership clear). A forked child inherits a COPY of the parent's queued events; clearing the child's copy ensures the child never double-delivers the parent's events (the parent's timer still runs there and delivers them). Distinct from #drain! (which allocates and returns the entries for delivery) — this just discards. Idempotent.
139 140 141 142 143 144 |
# File 'lib/convert_sdk/visitors_queue.rb', line 139 def clear @queue_mutex.synchronize do @items = [] @size = 0 end end |
#drain! ⇒ Array<Hash{String=>Object}>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Atomically drain the queue: swap out the current per-visitor entries and reset to empty inside the lock, returning the drained array. The caller (ApiManager) builds the payload and POSTs OUTSIDE the lock.
88 89 90 91 92 93 94 95 |
# File 'lib/convert_sdk/visitors_queue.rb', line 88 def drain! @queue_mutex.synchronize do drained = @items @items = [] @size = 0 drained end end |
#enqueue(visitor_id, event, segments: nil) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Enqueue one wire-shaped event for visitor_id, merging into the visitor's
existing entry (append) or creating a new one. Pure in-memory — never blocks
on I/O. On overflow past MAX_EVENTS the oldest event is dropped (+warn+).
67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/convert_sdk/visitors_queue.rb', line 67 def enqueue(visitor_id, event, segments: nil) @queue_mutex.synchronize do entry = @items.find { |item| item["visitorId"] == visitor_id } if entry entry["events"] << event else entry = { "visitorId" => visitor_id, "events" => [event] } #: Hash[String, untyped] entry["segments"] = segments unless segments.nil? @items << entry end @size += 1 trim_to_cap end end |
#requeue(visitors) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Re-enqueue previously drained per-visitor entries after a failed delivery
(Story 4.2 failure retention), PRESERVING the per-visitor merge. Runs as one
atomic compound operation inside @queue_mutex.
The drained events are OLDER than anything the queue received during the failed POST, so they are placed BEFORE newer events: a drained visitor that already has a live entry (new events arrived for it mid-failure) has its drained events PREPENDED to that entry — never a duplicate visitor entry; a drained visitor with no live entry is inserted at the FRONT of the queue (its events are the oldest). Segments ride from whichever entry has them (the live entry wins; otherwise the drained entry's segments are adopted).
Re-enqueued events count toward MAX_EVENTS: a sustained outage that keeps requeuing drops the OLDEST events (+warn+ per drop), bounding host memory without bound (NFR10).
116 117 118 119 120 121 122 123 124 125 |
# File 'lib/convert_sdk/visitors_queue.rb', line 116 def requeue(visitors) return if visitors.empty? @queue_mutex.synchronize do # Walk the drained entries in reverse so that successive front-inserts # preserve their original relative order at the head of the queue. visitors.reverse_each { |drained| merge_drained(drained) } trim_to_cap end end |
#size ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the total number of buffered EVENTS (not visitors).
128 129 130 |
# File 'lib/convert_sdk/visitors_queue.rb', line 128 def size @queue_mutex.synchronize { @size } end |