~cytrogen/gstack

ref: dc0bae82d31bda5f9a5f714a6d43946600c55827 gstack/browse/src/activity.ts -rw-r--r-- 6.4 KiB
dc0bae82 — Garry Tan fix: sidebar agent uses real tab URL instead of stale Playwright URL (v0.12.6.0) (#544) 13 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/**
 * Activity streaming — real-time feed of browse commands for the Chrome extension Side Panel
 *
 * Architecture:
 *   handleCommand() ──► emitActivity(command_start)
 *                   ──► emitActivity(command_end)
 *   wirePageEvents() ──► emitActivity(navigation)
 *
 *   GET /activity/stream?after=ID ──► SSE via ReadableStream
 *   GET /activity/history?limit=N ──► REST fallback
 *
 * Privacy: filterArgs() redacts passwords, auth tokens, and sensitive query params.
 * Backpressure: subscribers notified via queueMicrotask (never blocks command path).
 * Gap detection: client sends ?after=ID, server detects if ring buffer overflowed.
 */

import { CircularBuffer } from './buffers';

// ─── Types ──────────────────────────────────────────────────────

export interface ActivityEntry {
  id: number;
  timestamp: number;
  type: 'command_start' | 'command_end' | 'navigation' | 'error';
  command?: string;
  args?: string[];
  url?: string;
  duration?: number;
  status?: 'ok' | 'error';
  error?: string;
  result?: string;
  tabs?: number;
  mode?: string;
}

// ─── Buffer & Subscribers ───────────────────────────────────────

const BUFFER_CAPACITY = 1000;
const activityBuffer = new CircularBuffer<ActivityEntry>(BUFFER_CAPACITY);
let nextId = 1;

type ActivitySubscriber = (entry: ActivityEntry) => void;
const subscribers = new Set<ActivitySubscriber>();

// ─── Privacy Filtering ─────────────────────────────────────────

const SENSITIVE_COMMANDS = new Set(['fill', 'type', 'cookie', 'header']);
const SENSITIVE_PARAM_PATTERN = /\b(password|token|secret|key|auth|bearer|api[_-]?key)\b/i;

/**
 * Redact sensitive data from command args before streaming.
 */
export function filterArgs(command: string, args: string[]): string[] {
  if (!args || args.length === 0) return args;

  // fill: redact the value (last arg) for password-type fields
  if (command === 'fill' && args.length >= 2) {
    const selector = args[0];
    // If the selector suggests a password field, redact the value
    if (/password|passwd|secret|token/i.test(selector)) {
      return [selector, '[REDACTED]'];
    }
    return args;
  }

  // header: redact Authorization and other sensitive headers
  if (command === 'header' && args.length >= 1) {
    const headerLine = args[0];
    if (/^(authorization|x-api-key|cookie|set-cookie)/i.test(headerLine)) {
      const colonIdx = headerLine.indexOf(':');
      if (colonIdx > 0) {
        return [headerLine.substring(0, colonIdx + 1) + '[REDACTED]'];
      }
    }
    return args;
  }

  // cookie: redact cookie values
  if (command === 'cookie' && args.length >= 1) {
    const cookieStr = args[0];
    const eqIdx = cookieStr.indexOf('=');
    if (eqIdx > 0) {
      return [cookieStr.substring(0, eqIdx + 1) + '[REDACTED]'];
    }
    return args;
  }

  // type: always redact (could be a password field)
  if (command === 'type') {
    return ['[REDACTED]'];
  }

  // URL args: redact sensitive query params
  return args.map(arg => {
    if (arg.startsWith('http://') || arg.startsWith('https://')) {
      try {
        const url = new URL(arg);
        let redacted = false;
        for (const key of url.searchParams.keys()) {
          if (SENSITIVE_PARAM_PATTERN.test(key)) {
            url.searchParams.set(key, '[REDACTED]');
            redacted = true;
          }
        }
        return redacted ? url.toString() : arg;
      } catch {
        return arg;
      }
    }
    return arg;
  });
}

/**
 * Truncate result text for streaming (max 200 chars).
 */
function truncateResult(result: string | undefined): string | undefined {
  if (!result) return undefined;
  if (result.length <= 200) return result;
  return result.substring(0, 200) + '...';
}

// ─── Public API ─────────────────────────────────────────────────

/**
 * Emit an activity event. Backpressure-safe: subscribers notified asynchronously.
 */
export function emitActivity(entry: Omit<ActivityEntry, 'id' | 'timestamp'>): ActivityEntry {
  const full: ActivityEntry = {
    ...entry,
    id: nextId++,
    timestamp: Date.now(),
    args: entry.args ? filterArgs(entry.command || '', entry.args) : undefined,
    result: truncateResult(entry.result),
  };
  activityBuffer.push(full);

  // Notify subscribers asynchronously — never block the command path
  for (const notify of subscribers) {
    queueMicrotask(() => {
      try { notify(full); } catch { /* subscriber error — don't crash */ }
    });
  }

  return full;
}

/**
 * Subscribe to live activity events. Returns unsubscribe function.
 */
export function subscribe(fn: ActivitySubscriber): () => void {
  subscribers.add(fn);
  return () => subscribers.delete(fn);
}

/**
 * Get recent activity entries after the given cursor ID.
 * Returns entries and gap info if the buffer has overflowed.
 */
export function getActivityAfter(afterId: number): {
  entries: ActivityEntry[];
  gap: boolean;
  gapFrom?: number;
  availableFrom?: number;
  totalAdded: number;
} {
  const total = activityBuffer.totalAdded;
  const allEntries = activityBuffer.toArray();

  if (afterId === 0) {
    return { entries: allEntries, gap: false, totalAdded: total };
  }

  // Check for gap: if afterId is too old and has been evicted
  const oldestId = allEntries.length > 0 ? allEntries[0].id : nextId;
  if (afterId < oldestId) {
    return {
      entries: allEntries,
      gap: true,
      gapFrom: afterId + 1,
      availableFrom: oldestId,
      totalAdded: total,
    };
  }

  // Filter to entries after the cursor
  const filtered = allEntries.filter(e => e.id > afterId);
  return { entries: filtered, gap: false, totalAdded: total };
}

/**
 * Get the N most recent activity entries.
 */
export function getActivityHistory(limit: number = 50): {
  entries: ActivityEntry[];
  totalAdded: number;
} {
  const allEntries = activityBuffer.toArray();
  const sliced = limit < allEntries.length ? allEntries.slice(-limit) : allEntries;
  return { entries: sliced, totalAdded: activityBuffer.totalAdded };
}

/**
 * Get subscriber count (for debugging/health).
 */
export function getSubscriberCount(): number {
  return subscribers.size;
}