Files
dealplustech/dealplustech-astro/node_modules/@libsql/hrana-client/lib-esm/http/cursor.js
Kunthawat b2e427791b feat: Add complete PDPA compliance pages
- Admin dashboard (/admin/consent-logs) with password auth
- Consent API (/api/consent) with SQLite + IP hashing
- Privacy Policy (Thai) - PDPA Section 36 compliant
- Terms & Conditions (Thai) - 9 standard clauses
- .env.example template with Umami placeholder

All pages preserve current design system.
2026-03-10 21:28:23 +07:00

139 lines
4.9 KiB
JavaScript

import { ByteQueue } from "../byte_queue.js";
import { Cursor } from "../cursor.js";
import * as jsond from "../encoding/json/decode.js";
import * as protobufd from "../encoding/protobuf/decode.js";
import { ClientError, ClosedError, ProtoError, InternalError } from "../errors.js";
import { impossible } from "../util.js";
import { CursorRespBody as json_CursorRespBody } from "./json_decode.js";
import { CursorRespBody as protobuf_CursorRespBody } from "./protobuf_decode.js";
import { CursorEntry as json_CursorEntry } from "../shared/json_decode.js";
import { CursorEntry as protobuf_CursorEntry } from "../shared/protobuf_decode.js";
export class HttpCursor extends Cursor {
#stream;
#encoding;
#reader;
#queue;
#closed;
#done;
/** @private */
constructor(stream, encoding) {
super();
this.#stream = stream;
this.#encoding = encoding;
this.#reader = undefined;
this.#queue = new ByteQueue(16 * 1024);
this.#closed = undefined;
this.#done = false;
}
async open(response) {
if (response.body === null) {
throw new ProtoError("No response body for cursor request");
}
// node-fetch do not fully support WebStream API, especially getReader() function
// see https://github.com/node-fetch/node-fetch/issues/387
// so, we are using async iterator which behaves similarly here instead
this.#reader = response.body[Symbol.asyncIterator]();
const respBody = await this.#nextItem(json_CursorRespBody, protobuf_CursorRespBody);
if (respBody === undefined) {
throw new ProtoError("Empty response to cursor request");
}
return respBody;
}
/** Fetch the next entry from the cursor. */
next() {
return this.#nextItem(json_CursorEntry, protobuf_CursorEntry);
}
/** Close the cursor. */
close() {
this._setClosed(new ClientError("Cursor was manually closed"));
}
/** @private */
_setClosed(error) {
if (this.#closed !== undefined) {
return;
}
this.#closed = error;
this.#stream._cursorClosed(this);
if (this.#reader !== undefined) {
this.#reader.return();
}
}
/** True if the cursor is closed. */
get closed() {
return this.#closed !== undefined;
}
async #nextItem(jsonFun, protobufDef) {
for (;;) {
if (this.#done) {
return undefined;
}
else if (this.#closed !== undefined) {
throw new ClosedError("Cursor is closed", this.#closed);
}
if (this.#encoding === "json") {
const jsonData = this.#parseItemJson();
if (jsonData !== undefined) {
const jsonText = new TextDecoder().decode(jsonData);
const jsonValue = JSON.parse(jsonText);
return jsond.readJsonObject(jsonValue, jsonFun);
}
}
else if (this.#encoding === "protobuf") {
const protobufData = this.#parseItemProtobuf();
if (protobufData !== undefined) {
return protobufd.readProtobufMessage(protobufData, protobufDef);
}
}
else {
throw impossible(this.#encoding, "Impossible encoding");
}
if (this.#reader === undefined) {
throw new InternalError("Attempted to read from HTTP cursor before it was opened");
}
const { value, done } = await this.#reader.next();
if (done && this.#queue.length === 0) {
this.#done = true;
}
else if (done) {
throw new ProtoError("Unexpected end of cursor stream");
}
else {
this.#queue.push(value);
}
}
}
#parseItemJson() {
const data = this.#queue.data();
const newlineByte = 10;
const newlinePos = data.indexOf(newlineByte);
if (newlinePos < 0) {
return undefined;
}
const jsonData = data.slice(0, newlinePos);
this.#queue.shift(newlinePos + 1);
return jsonData;
}
#parseItemProtobuf() {
const data = this.#queue.data();
let varintValue = 0;
let varintLength = 0;
for (;;) {
if (varintLength >= data.byteLength) {
return undefined;
}
const byte = data[varintLength];
varintValue |= (byte & 0x7f) << (7 * varintLength);
varintLength += 1;
if (!(byte & 0x80)) {
break;
}
}
if (data.byteLength < varintLength + varintValue) {
return undefined;
}
const protobufData = data.slice(varintLength, varintLength + varintValue);
this.#queue.shift(varintLength + varintValue);
return protobufData;
}
}