billing-store.mjs
175 lines 5.0 KB
Raw
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor ⚠ breaking 1 day ago
1 /**
2 * Persistent billing DB: local file data/hosted_billing.json or Netlify Blob (gateway-billing).
3 */
4 import { normalizeBillingUser } from './billing-logic.mjs';
5 import fs from 'fs/promises';
6 import path from 'path';
7 import { fileURLToPath } from 'url';
8
9 let projectRoot;
10 try {
11 const __dirname = path.dirname(fileURLToPath(import.meta.url));
12 projectRoot = path.resolve(__dirname, '..', '..');
13 } catch (_) {
14 projectRoot = process.cwd();
15 }
16
17 const BILLING_FILE = path.join(projectRoot, 'data', 'hosted_billing.json');
18 const BLOB_KEY = 'billing-db-v1';
19 const MAX_EVENTS = 8000;
20
21 function emptyDb() {
22 return { users: {}, processed_events: [] };
23 }
24
25 function getBlobStore() {
26 return globalThis.__knowtation_gateway_blob;
27 }
28
29 async function readFromBlob() {
30 const store = getBlobStore();
31 if (!store) return null;
32 const raw = await store.get(BLOB_KEY, { type: 'json' });
33 if (!raw) return emptyDb();
34 return normalizeDb(raw);
35 }
36
37 async function writeToBlob(db) {
38 const store = getBlobStore();
39 if (!store) throw new Error('Netlify Blob store not configured');
40 await store.setJSON(BLOB_KEY, db);
41 }
42
43 async function readFromFile() {
44 try {
45 const raw = await fs.readFile(BILLING_FILE, 'utf8');
46 return normalizeDb(JSON.parse(raw));
47 } catch (e) {
48 if (e.code === 'ENOENT') return emptyDb();
49 throw e;
50 }
51 }
52
53 async function writeToFile(db) {
54 await fs.mkdir(path.dirname(BILLING_FILE), { recursive: true });
55 await fs.writeFile(BILLING_FILE, JSON.stringify(db, null, 2), 'utf8');
56 }
57
58 function normalizeDb(raw) {
59 const db = raw && typeof raw === 'object' ? raw : emptyDb();
60 if (!db.users || typeof db.users !== 'object') db.users = {};
61 if (!Array.isArray(db.processed_events)) db.processed_events = [];
62 for (const uid of Object.keys(db.users)) {
63 normalizeBillingUser(db.users[uid]);
64 }
65 return db;
66 }
67
68 export async function loadBillingDb() {
69 if (getBlobStore()) {
70 return readFromBlob();
71 }
72 return readFromFile();
73 }
74
75 export async function saveBillingDb(db) {
76 if (getBlobStore()) {
77 await writeToBlob(db);
78 } else {
79 await writeToFile(db);
80 }
81 }
82
83 /**
84 * In-process write queue. Serializes all mutateBillingDb calls so that concurrent
85 * requests within the same process (tests, local dev, single Netlify function instance)
86 * never interleave their read-modify-write cycles.
87 *
88 * Note: across separate Netlify function instances (cold starts, concurrent invocations
89 * handled by different workers) this queue has no effect — the backing Blob store is the
90 * only coordination point there. But eliminating in-process races is sufficient to keep
91 * CI stable and to prevent data loss during high-throughput local dev.
92 */
93 let _mutationQueue = Promise.resolve();
94
95 /**
96 * @param {(db: object) => void} fn - mutates db in place
97 */
98 export async function mutateBillingDb(fn) {
99 const run = _mutationQueue.then(async () => {
100 const db = await loadBillingDb();
101 fn(db);
102 trimEvents(db);
103 await saveBillingDb(db);
104 });
105 // Keep the queue alive even if this call throws; errors propagate to the caller, not the chain.
106 _mutationQueue = run.catch(() => {});
107 return run;
108 }
109
110 function trimEvents(db) {
111 while (db.processed_events.length > MAX_EVENTS) {
112 db.processed_events.shift();
113 }
114 }
115
116 export function eventAlreadyProcessed(db, eventId) {
117 return db.processed_events.includes(eventId);
118 }
119
120 export function markEventProcessed(db, eventId) {
121 if (!db.processed_events.includes(eventId)) db.processed_events.push(eventId);
122 }
123
124 export function findUserIdByCustomerId(db, customerId) {
125 if (!customerId) return null;
126 for (const uid of Object.keys(db.users)) {
127 if (db.users[uid].stripe_customer_id === customerId) return uid;
128 }
129 return null;
130 }
131
132 /**
133 * If the user's billing period has expired, reset monthly_indexing_tokens_used to 0 and
134 * advance period_start / period_end by one calendar month.
135 *
136 * This is a client-side guard for cases where the `invoice.paid` webhook is delayed or missed.
137 * It does NOT reset the credit (cents) ledger — that is handled by the Stripe invoice webhook.
138 *
139 * @param {string} userId
140 * @returns {Promise<void>}
141 */
142 export async function resetMonthlyTokensIfNeeded(userId) {
143 if (!userId) return;
144 const db = await loadBillingDb();
145 const u = db.users[userId];
146 if (!u) return;
147
148 const periodEnd = u.period_end ? new Date(u.period_end) : null;
149 if (!periodEnd || isNaN(periodEnd.getTime())) return;
150
151 const now = new Date();
152 if (now <= periodEnd) return;
153
154 await mutateBillingDb((dbMut) => {
155 const user = dbMut.users[userId];
156 if (!user) return;
157
158 const pe = new Date(user.period_end);
159 if (isNaN(pe.getTime()) || now <= pe) return;
160
161 // Reset all monthly counters.
162 user.monthly_indexing_tokens_used = 0;
163 user.monthly_used_cents = 0;
164 user.monthly_searches_used = 0;
165 user.monthly_index_jobs_used = 0;
166 user.monthly_consolidation_jobs_used = 0;
167
168 // Advance period by one month.
169 const newStart = new Date(pe);
170 const newEnd = new Date(pe);
171 newEnd.setMonth(newEnd.getMonth() + 1);
172 user.period_start = newStart.toISOString();
173 user.period_end = newEnd.toISOString();
174 });
175 }
File History 2 commits
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor 1 day ago
sha256:9103f98c89257ed2b01c237cea895dabb3e85ea337dccb1161c175e4422355b6 docs: accept Calendar Events v0 spec with Phase 0 security … Human 1 day ago