[MM-48375 Gekidou] Threads Sync Fix (#6788)

* Init

* Test fix

* New sync implementation

* misc

* Includes migration and other servers sync

* Misc

* Migration fix

* Migration is done version 7

* Update app/queries/servers/thread.ts

Co-authored-by: Elias Nahum <nahumhbl@gmail.com>

* Update app/database/operator/server_data_operator/handlers/team_threads_sync.ts

Co-authored-by: Elias Nahum <nahumhbl@gmail.com>

* Feedback changes

* Fixes when old thread gets a reply

* Fix

Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
Co-authored-by: Elias Nahum <nahumhbl@gmail.com>
This commit is contained in:
Anurag Shivarathri
2022-12-01 21:38:27 +05:30
committed by GitHub
parent f3f5cef8d1
commit 0e5d63a7c3
34 changed files with 631 additions and 336 deletions

View File

@@ -0,0 +1,71 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
import {Q, Database} from '@nozbe/watermelondb';
import {MM_TABLES} from '@constants/database';
import {transformTeamThreadsSyncRecord} from '@database/operator/server_data_operator/transformers/thread';
import {getRawRecordPairs, getUniqueRawsBy, getValidRecordsForUpdate} from '@database/operator/utils/general';
import {logWarning} from '@utils/log';
import type {HandleTeamThreadsSyncArgs, RecordPair} from '@typings/database/database';
import type TeamThreadsSyncModel from '@typings/database/models/servers/team_threads_sync';
export interface TeamThreadsSyncHandlerMix {
handleTeamThreadsSync: ({data, prepareRecordsOnly}: HandleTeamThreadsSyncArgs) => Promise<TeamThreadsSyncModel[]>;
}
const {TEAM_THREADS_SYNC} = MM_TABLES.SERVER;
const TeamThreadsSyncHandler = (superclass: any) => class extends superclass {
handleTeamThreadsSync = async ({data, prepareRecordsOnly = false}: HandleTeamThreadsSyncArgs): Promise<TeamThreadsSyncModel[]> => {
if (!data || !data.length) {
logWarning(
'An empty or undefined "data" array has been passed to the handleTeamThreadsSync method',
);
return [];
}
const uniqueRaws = getUniqueRawsBy({raws: data, key: 'id'}) as TeamThreadsSync[];
const ids = uniqueRaws.map((item) => item.id);
const chunks = await (this.database as Database).get<TeamThreadsSyncModel>(TEAM_THREADS_SYNC).query(
Q.where('id', Q.oneOf(ids)),
).fetch();
const chunksMap = chunks.reduce((result: Record<string, TeamThreadsSyncModel>, chunk) => {
result[chunk.id] = chunk;
return result;
}, {});
const create: TeamThreadsSync[] = [];
const update: RecordPair[] = [];
for await (const item of uniqueRaws) {
const {id} = item;
const chunk = chunksMap[id];
if (chunk) {
update.push(getValidRecordsForUpdate({
tableName: TEAM_THREADS_SYNC,
newValue: item,
existingRecord: chunk,
}));
} else {
create.push(item);
}
}
const models = (await this.prepareRecords({
createRaws: getRawRecordPairs(create),
updateRaws: update,
transformer: transformTeamThreadsSyncRecord,
tableName: TEAM_THREADS_SYNC,
})) as TeamThreadsSyncModel[];
if (models?.length && !prepareRecordsOnly) {
await this.batchRecords(models);
}
return models;
};
};
export default TeamThreadsSyncHandler;

View File

@@ -2,7 +2,7 @@
// See LICENSE.txt for license information.
import DatabaseManager from '@database/manager';
import {transformThreadRecord, transformThreadParticipantRecord, transformThreadInTeamRecord} from '@database/operator/server_data_operator/transformers/thread';
import {transformThreadRecord, transformThreadParticipantRecord, transformThreadInTeamRecord, transformTeamThreadsSyncRecord} from '@database/operator/server_data_operator/transformers/thread';
import ServerDataOperator from '..';
@@ -51,7 +51,7 @@ describe('*** Operator: Thread Handlers tests ***', () => {
] as ThreadWithLastFetchedAt[];
const threadsMap = {team_id_1: threads};
await operator.handleThreads({threads, loadedInGlobalThreads: false, prepareRecordsOnly: false, teamId: 'team_id_1'});
await operator.handleThreads({threads, prepareRecordsOnly: false, teamId: 'team_id_1'});
expect(spyOnHandleRecords).toHaveBeenCalledWith({
fieldName: 'id',
@@ -76,7 +76,6 @@ describe('*** Operator: Thread Handlers tests ***', () => {
expect(spyOnHandleThreadInTeam).toHaveBeenCalledWith({
threadsMap,
prepareRecordsOnly: true,
loadedInGlobalThreads: false,
});
// Only one batch operation for both tables
@@ -161,21 +160,77 @@ describe('*** Operator: Thread Handlers tests ***', () => {
team_id_2: team2Threads,
};
await operator.handleThreadInTeam({threadsMap, loadedInGlobalThreads: true, prepareRecordsOnly: false});
await operator.handleThreadInTeam({threadsMap, prepareRecordsOnly: false});
expect(spyOnPrepareRecords).toHaveBeenCalledWith({
createRaws: [{
raw: {team_id: 'team_id_1', thread_id: 'thread-2', loaded_in_global_threads: true},
raw: {team_id: 'team_id_1', thread_id: 'thread-1'},
}, {
raw: {team_id: 'team_id_2', thread_id: 'thread-2', loaded_in_global_threads: true},
raw: {team_id: 'team_id_1', thread_id: 'thread-2'},
}, {
raw: {team_id: 'team_id_2', thread_id: 'thread-2'},
}],
transformer: transformThreadInTeamRecord,
updateRaws: [
expect.objectContaining({
raw: {team_id: 'team_id_1', thread_id: 'thread-1', loaded_in_global_threads: true},
}),
],
tableName: 'ThreadsInTeam',
});
});
it('=> HandleTeamThreadsSync: should write to the the TeamThreadsSync table', async () => {
expect.assertions(1);
const spyOnPrepareRecords = jest.spyOn(operator, 'prepareRecords');
const data = [
{
id: 'team_id_1',
earliest: 100,
latest: 200,
},
{
id: 'team_id_2',
earliest: 100,
latest: 300,
},
] as TeamThreadsSync[];
await operator.handleTeamThreadsSync({data, prepareRecordsOnly: false});
expect(spyOnPrepareRecords).toHaveBeenCalledWith({
createRaws: [{
raw: {id: 'team_id_1', earliest: 100, latest: 200},
}, {
raw: {id: 'team_id_2', earliest: 100, latest: 300},
}],
updateRaws: [],
transformer: transformTeamThreadsSyncRecord,
tableName: 'TeamThreadsSync',
});
});
it('=> HandleTeamThreadsSync: should update the record in TeamThreadsSync table', async () => {
expect.assertions(1);
const spyOnPrepareRecords = jest.spyOn(operator, 'prepareRecords');
const data = [
{
id: 'team_id_1',
earliest: 100,
latest: 300,
},
] as TeamThreadsSync[];
await operator.handleTeamThreadsSync({data, prepareRecordsOnly: false});
expect(spyOnPrepareRecords).toHaveBeenCalledWith({
createRaws: [],
updateRaws: [
expect.objectContaining({
raw: {id: 'team_id_1', earliest: 100, latest: 300},
}),
],
transformer: transformTeamThreadsSyncRecord,
tableName: 'TeamThreadsSync',
});
});
});

View File

@@ -25,7 +25,7 @@ const {
} = MM_TABLES.SERVER;
export interface ThreadHandlerMix {
handleThreads: ({threads, teamId, prepareRecordsOnly, loadedInGlobalThreads}: HandleThreadsArgs) => Promise<Model[]>;
handleThreads: ({threads, teamId, prepareRecordsOnly}: HandleThreadsArgs) => Promise<Model[]>;
handleThreadParticipants: ({threadsParticipants, prepareRecordsOnly}: HandleThreadParticipantsArgs) => Promise<ThreadParticipantModel[]>;
}
@@ -37,7 +37,7 @@ const ThreadHandler = (superclass: any) => class extends superclass {
* @param {boolean | undefined} handleThreads.prepareRecordsOnly
* @returns {Promise<void>}
*/
handleThreads = async ({threads, teamId, loadedInGlobalThreads, prepareRecordsOnly = false}: HandleThreadsArgs): Promise<Model[]> => {
handleThreads = async ({threads, teamId, prepareRecordsOnly = false}: HandleThreadsArgs): Promise<Model[]> => {
if (!threads?.length) {
logWarning(
'An empty or undefined "threads" array has been passed to the handleThreads method',
@@ -119,7 +119,6 @@ const ThreadHandler = (superclass: any) => class extends superclass {
const threadsInTeam = await this.handleThreadInTeam({
threadsMap: {[teamId]: threads},
prepareRecordsOnly: true,
loadedInGlobalThreads,
}) as ThreadInTeamModel[];
batch.push(...threadsInTeam);
}

View File

@@ -5,20 +5,20 @@ import {Q, Database} from '@nozbe/watermelondb';
import {MM_TABLES} from '@constants/database';
import {transformThreadInTeamRecord} from '@database/operator/server_data_operator/transformers/thread';
import {getRawRecordPairs, getValidRecordsForUpdate} from '@database/operator/utils/general';
import {getRawRecordPairs} from '@database/operator/utils/general';
import {logWarning} from '@utils/log';
import type {HandleThreadInTeamArgs, RecordPair} from '@typings/database/database';
import type {HandleThreadInTeamArgs} from '@typings/database/database';
import type ThreadInTeamModel from '@typings/database/models/servers/thread_in_team';
export interface ThreadInTeamHandlerMix {
handleThreadInTeam: ({threadsMap, loadedInGlobalThreads, prepareRecordsOnly}: HandleThreadInTeamArgs) => Promise<ThreadInTeamModel[]>;
handleThreadInTeam: ({threadsMap, prepareRecordsOnly}: HandleThreadInTeamArgs) => Promise<ThreadInTeamModel[]>;
}
const {THREADS_IN_TEAM} = MM_TABLES.SERVER;
const ThreadInTeamHandler = (superclass: any) => class extends superclass {
handleThreadInTeam = async ({threadsMap, loadedInGlobalThreads, prepareRecordsOnly = false}: HandleThreadInTeamArgs): Promise<ThreadInTeamModel[]> => {
handleThreadInTeam = async ({threadsMap, prepareRecordsOnly = false}: HandleThreadInTeamArgs): Promise<ThreadInTeamModel[]> => {
if (!threadsMap || !Object.keys(threadsMap).length) {
logWarning(
'An empty or undefined "threadsMap" object has been passed to the handleReceivedPostForChannel method',
@@ -26,12 +26,13 @@ const ThreadInTeamHandler = (superclass: any) => class extends superclass {
return [];
}
const update: RecordPair[] = [];
const create: ThreadInTeam[] = [];
const teamIds = Object.keys(threadsMap);
for await (const teamId of teamIds) {
const threadIds = threadsMap[teamId].map((thread) => thread.id);
const chunks = await (this.database as Database).get<ThreadInTeamModel>(THREADS_IN_TEAM).query(
Q.where('team_id', teamId),
Q.where('id', Q.oneOf(threadIds)),
).fetch();
const chunksMap = chunks.reduce((result: Record<string, ThreadInTeamModel>, chunk) => {
result[chunk.threadId] = chunk;
@@ -41,29 +42,18 @@ const ThreadInTeamHandler = (superclass: any) => class extends superclass {
for (const thread of threadsMap[teamId]) {
const chunk = chunksMap[thread.id];
const newValue = {
thread_id: thread.id,
team_id: teamId,
loaded_in_global_threads: Boolean(loadedInGlobalThreads),
};
// update record only if loaded_in_global_threads is true
if (chunk && loadedInGlobalThreads) {
update.push(getValidRecordsForUpdate({
tableName: THREADS_IN_TEAM,
newValue,
existingRecord: chunk,
}));
} else {
// create chunk
create.push(newValue);
// Create if the chunk is not found
if (!chunk) {
create.push({
thread_id: thread.id,
team_id: teamId,
});
}
}
}
const threadsInTeam = (await this.prepareRecords({
createRaws: getRawRecordPairs(create),
updateRaws: update,
transformer: transformThreadInTeamRecord,
tableName: THREADS_IN_TEAM,
})) as ThreadInTeamModel[];