diff --git a/libs/common/src/abstractions/api.service.ts b/libs/common/src/abstractions/api.service.ts index 44b5e34a4a4..8e26707a88f 100644 --- a/libs/common/src/abstractions/api.service.ts +++ b/libs/common/src/abstractions/api.service.ts @@ -451,8 +451,9 @@ export abstract class ApiService { * Posts events for a user * @param request The array of events to upload * @param userId The optional user id the events belong to. If no user id is provided the active user id is used. + * @returns The list of events that failed to upload, or an empty array if all events were uploaded successfully. */ - postEventsCollect: (request: EventRequest[], userId?: UserId) => Promise; + postEventsCollect: (request: EventRequest[], userId?: UserId) => Promise; deleteSsoUser: (organizationId: string) => Promise; getSsoUserIdentifier: () => Promise; diff --git a/libs/common/src/platform/misc/utils.spec.ts b/libs/common/src/platform/misc/utils.spec.ts index 818138863fb..9a850be7ac2 100644 --- a/libs/common/src/platform/misc/utils.spec.ts +++ b/libs/common/src/platform/misc/utils.spec.ts @@ -775,4 +775,42 @@ describe("Utils Service", () => { }); }); }); + + describe("chunkArray", () => { + it("handles empty array", () => { + const result = Utils.chunkArray([], 3); + expect(result).toEqual([]); + }); + + it("handles null array", () => { + const result = Utils.chunkArray(null!, 3); + expect(result).toEqual([]); + }); + + it.each([0, -1, null!])("handles errors on invalid chunk sizes", (chunkSize: number) => { + expect(() => Utils.chunkArray([1, 2, 3], chunkSize)).toThrow( + `Chunk size must be greater than 0`, + ); + }); + + it("handles chunk size of 1", () => { + const result = Utils.chunkArray([1, 2, 3], 1); + expect(result).toEqual([[1], [2], [3]]); + }); + + it("handles chunk size equal to array length", () => { + const result = Utils.chunkArray([1, 2, 3], 3); + expect(result).toEqual([[1, 2, 3]]); + }); + + it("handles chunk size greater than array length", () => { + const result = Utils.chunkArray([1, 2, 3], 5); + expect(result).toEqual([[1, 2, 3]]); + }); + + it("handles chunk size less than array length", () => { + const result = Utils.chunkArray([1, 2, 3, 4, 5], 2); + expect(result).toEqual([[1, 2], [3, 4], [5]]); + }); + }); }); diff --git a/libs/common/src/platform/misc/utils.ts b/libs/common/src/platform/misc/utils.ts index b3c1db91806..443477a419c 100644 --- a/libs/common/src/platform/misc/utils.ts +++ b/libs/common/src/platform/misc/utils.ts @@ -6,7 +6,7 @@ import * as path from "path"; import { Buffer as BufferLib } from "buffer/"; import { Observable, of, switchMap } from "rxjs"; import { getHostname, parse } from "tldts"; -import { Merge } from "type-fest"; +import { Merge, NonNegative } from "type-fest"; // This import has been flagged as unallowed for this class. It may be involved in a circular dependency loop. // eslint-disable-next-line no-restricted-imports @@ -621,6 +621,21 @@ export class Utils { return null; } + + static chunkArray(a: T[], chunkSize: NonNegative): T[][] { + if (chunkSize <= 0) { + throw new Error("Chunk size must be greater than 0"); + } + if (a == null || a.length === 0) { + return []; + } + + const res = []; + for (let i = 0; i < a.length; i += chunkSize) { + res.push(a.slice(i, i + chunkSize)); + } + return res; + } } Utils.init(); diff --git a/libs/common/src/services/api.service.ts b/libs/common/src/services/api.service.ts index 1971cd86363..7c2e18009f9 100644 --- a/libs/common/src/services/api.service.ts +++ b/libs/common/src/services/api.service.ts @@ -1407,7 +1407,7 @@ export class ApiService implements ApiServiceAbstraction { return new ListResponse(r, EventResponse); } - async postEventsCollect(request: EventRequest[], userId?: UserId): Promise { + async postEventsCollect(requests: EventRequest[], userId?: UserId): Promise { const authHeader = await this.tokenService.getAccessToken(userId); const headers = new Headers({ "Device-Type": this.deviceType, @@ -1418,18 +1418,37 @@ export class ApiService implements ApiServiceAbstraction { headers.set("User-Agent", this.customUserAgent); } const env = await firstValueFrom(this.environmentService.environment$); - const response = await this.fetch( - this.httpOperations.createRequest(env.getEventsUrl() + "/collect", { - cache: "no-store", - credentials: await this.getCredentials(), - method: "POST", - body: JSON.stringify(request), - headers: headers, - }), - ); - if (response.status !== 200) { - return Promise.reject("Event post failed."); - } + // TODO: MDG stringify the request array into requests of no more than 50 KiB + + // Break uploads into chunks of 100 events + let bail = false; + const failedRequests: EventRequest[] = []; + Utils.chunkArray(requests, 300).forEach(async (eventRequests) => { + // We only fail once per set of uploads + if (bail) { + failedRequests.push(...eventRequests); + return; + } + + try { + const response = await this.fetch( + this.httpOperations.createRequest(env.getEventsUrl() + "/collect", { + cache: "no-store", + credentials: await this.getCredentials(), + method: "POST", + body: JSON.stringify(eventRequests), + headers: headers, + }), + ); + if (response.status !== 200) { + throw new Error("Event post failed."); + } + } catch { + bail = true; + failedRequests.push(...eventRequests); + } + }); + return failedRequests; } // User APIs diff --git a/libs/common/src/services/event/event-upload.service.ts b/libs/common/src/services/event/event-upload.service.ts index d3836a0030f..b36c89b4478 100644 --- a/libs/common/src/services/event/event-upload.service.ts +++ b/libs/common/src/services/event/event-upload.service.ts @@ -71,7 +71,7 @@ export class EventUploadService implements EventUploadServiceAbstraction { if (eventCollection == null || eventCollection.length === 0) { return; } - const request = eventCollection.map((e) => { + const eventRequests = eventCollection.map((e) => { const req = new EventRequest(); req.type = e.type; req.cipherId = e.cipherId; @@ -79,12 +79,26 @@ export class EventUploadService implements EventUploadServiceAbstraction { req.organizationId = e.organizationId; return req; }); + + let failedEvents: EventRequest[]; try { - await this.apiService.postEventsCollect(request, userId); + failedEvents = await this.apiService.postEventsCollect(eventRequests, userId); } catch (e) { this.logService.error(e); - // Add the events back to state if there was an error and they were not uploaded. - await this.stateProvider.setUserState(EVENT_COLLECTION, eventCollection, userId); + failedEvents = eventRequests; + } + + // Add any events that failed to upload back to state. + if (failedEvents && failedEvents.length > 0) { + const failedEventData = failedEvents.map((e) => { + const eventData = new EventData(); + eventData.type = e.type; + eventData.cipherId = e.cipherId; + eventData.date = e.date; + eventData.organizationId = e.organizationId; + return eventData; + }); + await this.stateProvider.setUserState(EVENT_COLLECTION, failedEventData, userId); } }