Class: ConvertSdk::VisitorsQueue Private

Inherits:
Object
  • Object
show all
Defined in:
lib/convert_sdk/visitors_queue.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

The per-visitor event queue — the in-memory buffer between the decision flow (which enqueues bucketing/conversion events) and ApiManager (which drains and POSTs them in the Convert wire format).

Per-visitor merge (structural invariant, FR36)

The queue holds ONE entry per visitor — a string-keyed wire-shaped hash {"visitorId" => id, "segments" => {...}?, "events" => [...]}. Enqueuing an event for a visitor already in the queue APPENDS to that visitor's events array; it never adds a duplicate visitor entry and never flattens to a bare event list. The platform attributes events by walking visitors[].events, so flattening or duplicating corrupts report attribution. The structure itself enforces the invariant — there is no public path that bypasses the merge. (JS parity: api-manager.ts:117-144; PHP VisitorsQueue.php:64-70.)

segments ride on the visitor entry and are captured ONLY when the entry is first created (omitted entirely when none are supplied) — a later enqueue for the same visitor never overwrites them (JS if (segments) visitor.segments = …).

Bounded memory (FR39/NFR10)

The queue is bounded at MAX_EVENTS EVENTS (events, not visitors). On overflow the OLDEST event is dropped — and the visitor entry is removed once its last event is gone — with a warn log per drop. An endpoint outage can never grow host memory without bound; dropping the oldest (not the newest) keeps the most recent traffic. (Optimizely DEFAULT_QUEUE_CAPACITY = 1000 precedent; research frozen register #7.)

Thread safety (NFR2/NFR13)

Every operation is serialized by @queue_mutex. #enqueue is pure in-memory and never blocks on I/O, so the calling request thread is never held on the network. #drain! is an atomic drain-and-swap inside the lock returning the drained visitors array — ApiManager builds the payload and POSTs OUTSIDE the lock, so network I/O never holds the queue. The drained array is re-enqueueable without violating the per-visitor merge (the retention path Story 4.2 needs).

Constant Summary collapse

MAX_EVENTS =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

The hard upper bound on buffered events (events, not visitors). Research frozen register #7; the JS SDK has no equivalent memory cap.

1000

Instance Method Summary collapse

Constructor Details

#initialize(log_manager:) ⇒ VisitorsQueue

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of VisitorsQueue.

Parameters:

  • log_manager (LogManager)

    the redacting logging surface (warn on overflow).



48
49
50
51
52
53
54
55
# File 'lib/convert_sdk/visitors_queue.rb', line 48

def initialize(log_manager:)
  @log_manager = log_manager
  # Thread safety: guarded by @queue_mutex. @items is the ordered list of
  # per-visitor entries; @size is the total event count (the cap dimension).
  @queue_mutex = Thread::Mutex.new
  @items = [] #: Array[Hash[String, untyped]]
  @size = 0
end

Instance Method Details

#clearvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Atomically empty the queue WITHOUT returning the entries (Story 4.4 child queue-ownership clear). A forked child inherits a COPY of the parent's queued events; clearing the child's copy ensures the child never double-delivers the parent's events (the parent's timer still runs there and delivers them). Distinct from #drain! (which allocates and returns the entries for delivery) — this just discards. Idempotent.



139
140
141
142
143
144
# File 'lib/convert_sdk/visitors_queue.rb', line 139

def clear
  @queue_mutex.synchronize do
    @items = []
    @size = 0
  end
end

#drain!Array<Hash{String=>Object}>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Atomically drain the queue: swap out the current per-visitor entries and reset to empty inside the lock, returning the drained array. The caller (ApiManager) builds the payload and POSTs OUTSIDE the lock.

Returns:

  • (Array<Hash{String=>Object}>)

    the drained per-visitor entries (empty when nothing was queued); re-enqueueable verbatim.



88
89
90
91
92
93
94
95
# File 'lib/convert_sdk/visitors_queue.rb', line 88

def drain!
  @queue_mutex.synchronize do
    drained = @items
    @items = []
    @size = 0
    drained
  end
end

#enqueue(visitor_id, event, segments: nil) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Enqueue one wire-shaped event for visitor_id, merging into the visitor's existing entry (append) or creating a new one. Pure in-memory — never blocks on I/O. On overflow past MAX_EVENTS the oldest event is dropped (+warn+).

Parameters:

  • visitor_id (String)

    the visitor the event belongs to.

  • event (Hash{String=>Object})

    a wire-shaped (string-keyed camelCase) event hash, e.g. {"eventType"=>"bucketing", "data"=>{...}}.

  • segments (Hash{String=>Object}, nil) (defaults to: nil)

    the visitor's report-segments, attached ONLY when this enqueue first creates the visitor's entry.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/convert_sdk/visitors_queue.rb', line 67

def enqueue(visitor_id, event, segments: nil)
  @queue_mutex.synchronize do
    entry = @items.find { |item| item["visitorId"] == visitor_id }
    if entry
      entry["events"] << event
    else
      entry = { "visitorId" => visitor_id, "events" => [event] } #: Hash[String, untyped]
      entry["segments"] = segments unless segments.nil?
      @items << entry
    end
    @size += 1
    trim_to_cap
  end
end

#requeue(visitors) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Re-enqueue previously drained per-visitor entries after a failed delivery (Story 4.2 failure retention), PRESERVING the per-visitor merge. Runs as one atomic compound operation inside @queue_mutex.

The drained events are OLDER than anything the queue received during the failed POST, so they are placed BEFORE newer events: a drained visitor that already has a live entry (new events arrived for it mid-failure) has its drained events PREPENDED to that entry — never a duplicate visitor entry; a drained visitor with no live entry is inserted at the FRONT of the queue (its events are the oldest). Segments ride from whichever entry has them (the live entry wins; otherwise the drained entry's segments are adopted).

Re-enqueued events count toward MAX_EVENTS: a sustained outage that keeps requeuing drops the OLDEST events (+warn+ per drop), bounding host memory without bound (NFR10).

Parameters:

  • visitors (Array<Hash{String=>Object}>)

    drained per-visitor entries (as returned by #drain!); an empty array is a no-op.



116
117
118
119
120
121
122
123
124
125
# File 'lib/convert_sdk/visitors_queue.rb', line 116

def requeue(visitors)
  return if visitors.empty?

  @queue_mutex.synchronize do
    # Walk the drained entries in reverse so that successive front-inserts
    # preserve their original relative order at the head of the queue.
    visitors.reverse_each { |drained| merge_drained(drained) }
    trim_to_cap
  end
end

#sizeInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the total number of buffered EVENTS (not visitors).

Returns:

  • (Integer)

    the total number of buffered EVENTS (not visitors).



128
129
130
# File 'lib/convert_sdk/visitors_queue.rb', line 128

def size
  @queue_mutex.synchronize { @size }
end