Remove watermelondb limitation on updating an already updated model (#7067)

* Remove watermelondb limitation on updating an already updated model

* Add logic to handle different prepare states and improve logging

* fix tests

---------

Co-authored-by: Elias Nahum <nahumhbl@gmail.com>
This commit is contained in:
Daniel Espino García
2023-02-03 16:11:57 +01:00
committed by GitHub
parent 449c5edac9
commit 980c31f40f
83 changed files with 893 additions and 881 deletions

View File

@@ -46,7 +46,7 @@ describe('*** Operator: Category Handlers tests ***', () => {
tableName: MM_TABLES.SERVER.CATEGORY,
prepareRecordsOnly: false,
transformer: transformCategoryRecord,
});
}, 'handleCategories');
});
it('=> handleCategoryChannels: should write to the CATEGORY_CHANNEL table', async () => {
@@ -74,6 +74,6 @@ describe('*** Operator: Category Handlers tests ***', () => {
tableName: MM_TABLES.SERVER.CATEGORY_CHANNEL,
prepareRecordsOnly: false,
transformer: transformCategoryChannelRecord,
});
}, 'handleCategoryChannels');
});
});

View File

@@ -11,6 +11,7 @@ import {
import {getUniqueRawsBy} from '@database/operator/utils/general';
import {logWarning} from '@utils/log';
import type ServerDataOperatorBase from '.';
import type {
HandleCategoryChannelArgs,
HandleCategoryArgs,
@@ -28,7 +29,7 @@ export interface CategoryHandlerMix {
handleCategories: ({categories, prepareRecordsOnly}: HandleCategoryArgs) => Promise<CategoryModel[]>;
}
const CategoryHandler = (superclass: any) => class extends superclass {
const CategoryHandler = <TBase extends Constructor<ServerDataOperatorBase>>(superclass: TBase) => class extends superclass {
/**
* handleCategories: Handler responsible for the Create/Update operations occurring on the Category table from the 'Server' schema
* @param {HandleCategoryArgs} categoriesArgs
@@ -78,7 +79,7 @@ const CategoryHandler = (superclass: any) => class extends superclass {
createOrUpdateRawValues,
tableName: CATEGORY,
prepareRecordsOnly,
});
}, 'handleCategories');
};
/**
@@ -88,7 +89,7 @@ const CategoryHandler = (superclass: any) => class extends superclass {
* @param {boolean} categoriesArgs.prepareRecordsOnly
* @returns {Promise<CategoryChannelModel[]>}
*/
handleCategoryChannels = async ({categoryChannels, prepareRecordsOnly = true}: HandleCategoryChannelArgs): Promise<CategoryModel[]> => {
handleCategoryChannels = async ({categoryChannels, prepareRecordsOnly = true}: HandleCategoryChannelArgs): Promise<CategoryChannelModel[]> => {
if (!categoryChannels?.length) {
logWarning(
'An empty or undefined "categoryChannels" array has been passed to the handleCategories method',
@@ -105,7 +106,7 @@ const CategoryHandler = (superclass: any) => class extends superclass {
createOrUpdateRawValues,
tableName: CATEGORY_CHANNEL,
prepareRecordsOnly,
});
}, 'handleCategoryChannels');
};
};

View File

@@ -63,7 +63,7 @@ describe('*** Operator: Channel Handlers tests ***', () => {
tableName: 'Channel',
prepareRecordsOnly: false,
transformer: transformChannelRecord,
});
}, 'handleChannel');
});
it('=> HandleMyChannelSettings: should write to the MY_CHANNEL_SETTINGS table', async () => {
@@ -103,7 +103,7 @@ describe('*** Operator: Channel Handlers tests ***', () => {
prepareRecordsOnly: false,
buildKeyRecordBy: buildMyChannelKey,
transformer: transformMyChannelSettingsRecord,
});
}, 'handleMyChannelSettings');
});
it('=> HandleChannelInfo: should write to the CHANNEL_INFO table', async () => {
@@ -134,7 +134,7 @@ describe('*** Operator: Channel Handlers tests ***', () => {
tableName: 'ChannelInfo',
prepareRecordsOnly: false,
transformer: transformChannelInfoRecord,
});
}, 'handleChannelInfo');
});
it('=> HandleMyChannel: should write to the MY_CHANNEL table', async () => {
@@ -195,7 +195,7 @@ describe('*** Operator: Channel Handlers tests ***', () => {
prepareRecordsOnly: false,
buildKeyRecordBy: buildMyChannelKey,
transformer: transformMyChannelRecord,
});
}, 'handleMyChannel');
});
it('=> HandleMyChannel: should keep the previous lastFetchedAt for MY_CHANNEL', async () => {
@@ -314,6 +314,6 @@ describe('*** Operator: Channel Handlers tests ***', () => {
prepareRecordsOnly: false,
buildKeyRecordBy: buildChannelMembershipKey,
transformer: transformChannelMembershipRecord,
});
}, 'handleChannelMembership');
});
});

View File

@@ -19,6 +19,7 @@ import {getUniqueRawsBy} from '@database/operator/utils/general';
import {getIsCRTEnabled} from '@queries/servers/thread';
import {logWarning} from '@utils/log';
import type ServerDataOperatorBase from '.';
import type {HandleChannelArgs, HandleChannelInfoArgs, HandleChannelMembershipArgs, HandleMyChannelArgs, HandleMyChannelSettingsArgs} from '@typings/database/database';
import type ChannelModel from '@typings/database/models/servers/channel';
import type ChannelInfoModel from '@typings/database/models/servers/channel_info';
@@ -42,7 +43,7 @@ export interface ChannelHandlerMix {
handleMyChannel: ({channels, myChannels, isCRTEnabled, prepareRecordsOnly}: HandleMyChannelArgs) => Promise<MyChannelModel[]>;
}
const ChannelHandler = (superclass: any) => class extends superclass {
const ChannelHandler = <TBase extends Constructor<ServerDataOperatorBase>>(superclass: TBase) => class extends superclass {
/**
* handleChannel: Handler responsible for the Create/Update operations occurring on the CHANNEL table from the 'Server' schema
* @param {HandleChannelArgs} channelsArgs
@@ -89,7 +90,7 @@ const ChannelHandler = (superclass: any) => class extends superclass {
prepareRecordsOnly,
createOrUpdateRawValues,
tableName: CHANNEL,
});
}, 'handleChannel');
};
/**
@@ -146,7 +147,7 @@ const ChannelHandler = (superclass: any) => class extends superclass {
prepareRecordsOnly,
createOrUpdateRawValues,
tableName: MY_CHANNEL_SETTINGS,
});
}, 'handleMyChannelSettings');
};
/**
@@ -205,7 +206,7 @@ const ChannelHandler = (superclass: any) => class extends superclass {
prepareRecordsOnly,
createOrUpdateRawValues,
tableName: CHANNEL_INFO,
});
}, 'handleChannelInfo');
};
/**
@@ -292,7 +293,7 @@ const ChannelHandler = (superclass: any) => class extends superclass {
prepareRecordsOnly,
createOrUpdateRawValues,
tableName: MY_CHANNEL,
});
}, 'handleMyChannel');
};
/**
@@ -348,7 +349,7 @@ const ChannelHandler = (superclass: any) => class extends superclass {
prepareRecordsOnly,
createOrUpdateRawValues,
tableName: CHANNEL_MEMBERSHIP,
});
}, 'handleChannelMembership');
};
};

View File

@@ -49,6 +49,6 @@ describe('*** Operator: Group Handlers tests ***', () => {
tableName: MM_TABLES.SERVER.GROUP,
prepareRecordsOnly: false,
transformer: transformGroupRecord,
});
}, 'handleGroups');
});
});

View File

@@ -8,6 +8,7 @@ import {queryGroupChannelForChannel, queryGroupMembershipForMember, queryGroupTe
import {generateGroupAssociationId} from '@utils/groups';
import {logWarning} from '@utils/log';
import type ServerDataOperatorBase from '.';
import type {HandleGroupArgs, HandleGroupChannelsForChannelArgs, HandleGroupMembershipForMemberArgs, HandleGroupTeamsForTeamArgs} from '@typings/database/database';
import type GroupModel from '@typings/database/models/servers/group';
import type GroupChannelModel from '@typings/database/models/servers/group_channel';
@@ -23,7 +24,7 @@ export interface GroupHandlerMix {
handleGroupTeamsForTeam: ({teamId, groups, prepareRecordsOnly}: HandleGroupTeamsForTeamArgs) => Promise<GroupTeamModel[]>;
}
const GroupHandler = (superclass: any) => class extends superclass implements GroupHandlerMix {
const GroupHandler = <TBase extends Constructor<ServerDataOperatorBase>>(superclass: TBase) => class extends superclass implements GroupHandlerMix {
/**
* handleGroups: Handler responsible for the Create/Update operations occurring on the Group table from the 'Server' schema
*
@@ -46,7 +47,7 @@ const GroupHandler = (superclass: any) => class extends superclass implements Gr
createOrUpdateRawValues,
tableName: GROUP,
prepareRecordsOnly,
});
}, 'handleGroups');
};
/**
@@ -93,14 +94,14 @@ const GroupHandler = (superclass: any) => class extends superclass implements Gr
records.push(...(await this.handleRecords({
fieldName: 'id',
transformer: transformGroupChannelRecord,
rawValues,
createOrUpdateRawValues: rawValues,
tableName: GROUP_CHANNEL,
prepareRecordsOnly: true,
})));
}, 'handleGroupChannelsForChannel')));
// Batch update if there are records
if (records.length && !prepareRecordsOnly) {
await this.batchRecords(records);
await this.batchRecords(records, 'handleGroupChannelsForChannel');
}
return records;
@@ -150,14 +151,14 @@ const GroupHandler = (superclass: any) => class extends superclass implements Gr
records.push(...(await this.handleRecords({
fieldName: 'id',
transformer: transformGroupMembershipRecord,
rawValues,
createOrUpdateRawValues: rawValues,
tableName: GROUP_MEMBERSHIP,
prepareRecordsOnly: true,
})));
}, 'handleGroupMembershipsForMember')));
// Batch update if there are records
if (records.length && !prepareRecordsOnly) {
await this.batchRecords(records);
await this.batchRecords(records, 'handleGroupMembershipsForMember');
}
return records;
@@ -207,14 +208,14 @@ const GroupHandler = (superclass: any) => class extends superclass implements Gr
records.push(...(await this.handleRecords({
fieldName: 'id',
transformer: transformGroupTeamRecord,
rawValues,
createOrUpdateRawValues: rawValues,
tableName: GROUP_TEAM,
prepareRecordsOnly: true,
})));
}, 'handleGroupTeamsForTeam')));
// Batch update if there are records
if (records.length && !prepareRecordsOnly) {
await this.batchRecords(records);
await this.batchRecords(records, 'handleGroupTeamsForTeam');
}
return records;

View File

@@ -42,7 +42,7 @@ describe('*** DataOperator: Base Handlers tests ***', () => {
createOrUpdateRawValues: roles,
tableName: 'Role',
prepareRecordsOnly: false,
});
}, 'handleRole');
});
it('=> HandleCustomEmojis: should write to the CUSTOM_EMOJI table', async () => {
@@ -72,7 +72,7 @@ describe('*** DataOperator: Base Handlers tests ***', () => {
tableName: 'CustomEmoji',
prepareRecordsOnly: false,
transformer: transformCustomEmojiRecord,
});
}, 'handleCustomEmojis');
});
it('=> HandleSystem: should write to the SYSTEM table', async () => {
@@ -93,7 +93,7 @@ describe('*** DataOperator: Base Handlers tests ***', () => {
createOrUpdateRawValues: systems,
tableName: 'System',
prepareRecordsOnly: false,
});
}, 'handleSystem');
});
it('=> HandleConfig: should write to the CONFIG table', async () => {
@@ -117,7 +117,7 @@ describe('*** DataOperator: Base Handlers tests ***', () => {
tableName: 'Config',
prepareRecordsOnly: false,
deleteRawValues: configsToDelete,
});
}, 'handleConfigs');
});
it('=> No table name: should not call execute if tableName is invalid', async () => {
@@ -135,7 +135,7 @@ describe('*** DataOperator: Base Handlers tests ***', () => {
transformer: transformSystemRecord,
createOrUpdateRawValues: [{id: 'tos-1', value: '1'}],
prepareRecordsOnly: false,
}),
}, 'test'),
).rejects.toThrow(Error);
});
});

View File

@@ -12,13 +12,17 @@ import {
import {getUniqueRawsBy} from '@database/operator/utils/general';
import {logWarning} from '@utils/log';
import {sanitizeReactions} from '../../utils/reaction';
import {transformReactionRecord} from '../transformers/reaction';
import type {Model} from '@nozbe/watermelondb';
import type {HandleConfigArgs, HandleCustomEmojiArgs, HandleRoleArgs, HandleSystemArgs, OperationArgs} from '@typings/database/database';
import type {HandleConfigArgs, HandleCustomEmojiArgs, HandleReactionsArgs, HandleRoleArgs, HandleSystemArgs, OperationArgs} from '@typings/database/database';
import type CustomEmojiModel from '@typings/database/models/servers/custom_emoji';
import type ReactionModel from '@typings/database/models/servers/reaction';
import type RoleModel from '@typings/database/models/servers/role';
import type SystemModel from '@typings/database/models/servers/system';
const {SERVER: {CONFIG, CUSTOM_EMOJI, ROLE, SYSTEM}} = MM_TABLES;
const {SERVER: {CONFIG, CUSTOM_EMOJI, ROLE, SYSTEM, REACTION}} = MM_TABLES;
export default class ServerDataOperatorBase extends BaseDataOperator {
handleRole = async ({roles, prepareRecordsOnly = true}: HandleRoleArgs) => {
@@ -35,7 +39,7 @@ export default class ServerDataOperatorBase extends BaseDataOperator {
prepareRecordsOnly,
createOrUpdateRawValues: getUniqueRawsBy({raws: roles, key: 'id'}),
tableName: ROLE,
}) as Promise<RoleModel[]>;
}, 'handleRole') as Promise<RoleModel[]>;
};
handleCustomEmojis = async ({emojis, prepareRecordsOnly = true}: HandleCustomEmojiArgs) => {
@@ -52,7 +56,7 @@ export default class ServerDataOperatorBase extends BaseDataOperator {
prepareRecordsOnly,
createOrUpdateRawValues: getUniqueRawsBy({raws: emojis, key: 'name'}),
tableName: CUSTOM_EMOJI,
}) as Promise<CustomEmojiModel[]>;
}, 'handleCustomEmojis') as Promise<CustomEmojiModel[]>;
};
handleSystem = async ({systems, prepareRecordsOnly = true}: HandleSystemArgs) => {
@@ -69,7 +73,7 @@ export default class ServerDataOperatorBase extends BaseDataOperator {
prepareRecordsOnly,
createOrUpdateRawValues: getUniqueRawsBy({raws: systems, key: 'id'}),
tableName: SYSTEM,
}) as Promise<SystemModel[]>;
}, 'handleSystem') as Promise<SystemModel[]>;
};
handleConfigs = async ({configs, configsToDelete, prepareRecordsOnly = true}: HandleConfigArgs) => {
@@ -87,7 +91,64 @@ export default class ServerDataOperatorBase extends BaseDataOperator {
createOrUpdateRawValues: getUniqueRawsBy({raws: configs, key: 'id'}),
tableName: CONFIG,
deleteRawValues: configsToDelete,
});
}, 'handleConfigs');
};
/**
* handleReactions: Handler responsible for the Create/Update operations occurring on the Reaction table from the 'Server' schema
* @param {HandleReactionsArgs} handleReactions
* @param {ReactionsPerPost[]} handleReactions.postsReactions
* @param {boolean} handleReactions.prepareRecordsOnly
* @param {boolean} handleReactions.skipSync
* @returns {Promise<Array<(ReactionModel | CustomEmojiModel)>>}
*/
handleReactions = async ({postsReactions, prepareRecordsOnly, skipSync}: HandleReactionsArgs): Promise<ReactionModel[]> => {
const batchRecords: ReactionModel[] = [];
if (!postsReactions?.length) {
logWarning(
'An empty or undefined "postsReactions" array has been passed to the handleReactions method',
);
return [];
}
for await (const postReactions of postsReactions) {
const {post_id, reactions} = postReactions;
const {
createReactions,
deleteReactions,
} = await sanitizeReactions({
database: this.database,
post_id,
rawReactions: reactions,
skipSync,
});
if (createReactions?.length) {
// Prepares record for model Reactions
const reactionsRecords = (await this.prepareRecords({
createRaws: createReactions,
transformer: transformReactionRecord,
tableName: REACTION,
})) as ReactionModel[];
batchRecords.push(...reactionsRecords);
}
if (deleteReactions?.length && !skipSync) {
deleteReactions.forEach((outCast) => outCast.prepareDestroyPermanently());
batchRecords.push(...deleteReactions);
}
}
if (prepareRecordsOnly) {
return batchRecords;
}
if (batchRecords?.length) {
await this.batchRecords(batchRecords, 'handleReactions');
}
return batchRecords;
};
/**
@@ -99,7 +160,7 @@ export default class ServerDataOperatorBase extends BaseDataOperator {
* @param {(TransformerArgs) => Promise<Model>} execute.recordOperator
* @returns {Promise<void>}
*/
async execute({createRaws, transformer, tableName, updateRaws}: OperationArgs): Promise<Model[]> {
async execute<T extends Model>({createRaws, transformer, tableName, updateRaws}: OperationArgs<T>, description: string): Promise<T[]> {
const models = await this.prepareRecords({
tableName,
createRaws,
@@ -108,7 +169,7 @@ export default class ServerDataOperatorBase extends BaseDataOperator {
});
if (models?.length > 0) {
await this.batchRecords(models);
await this.batchRecords(models, description);
}
return models;

View File

@@ -61,7 +61,7 @@ describe('*** Operator: Post Handlers tests ***', () => {
createOrUpdateRawValues: drafts,
tableName: 'Draft',
prepareRecordsOnly: false,
});
}, 'handleDraft');
});
it('=> HandlePosts: should write to the Post and its sub-child tables', async () => {

View File

@@ -9,15 +9,19 @@ import {buildDraftKey} from '@database/operator/server_data_operator/comparators
import {
transformDraftRecord,
transformFileRecord,
transformPostInThreadRecord,
transformPostRecord,
transformPostsInChannelRecord,
} from '@database/operator/server_data_operator/transformers/post';
import {getUniqueRawsBy} from '@database/operator/utils/general';
import {createPostsChain} from '@database/operator/utils/post';
import {getRawRecordPairs, getUniqueRawsBy, getValidRecordsForUpdate} from '@database/operator/utils/general';
import {createPostsChain, getPostListEdges} from '@database/operator/utils/post';
import {emptyFunction} from '@utils/general';
import {logWarning} from '@utils/log';
import type ServerDataOperatorBase from '.';
import type Database from '@nozbe/watermelondb/Database';
import type Model from '@nozbe/watermelondb/Model';
import type {HandleDraftArgs, HandleFilesArgs, HandlePostsArgs, ProcessRecordResults} from '@typings/database/database';
import type {HandleDraftArgs, HandleFilesArgs, HandlePostsArgs, RecordPair} from '@typings/database/database';
import type DraftModel from '@typings/database/models/servers/draft';
import type FileModel from '@typings/database/models/servers/file';
import type PostModel from '@typings/database/models/servers/post';
@@ -29,6 +33,8 @@ const {
DRAFT,
FILE,
POST,
POSTS_IN_CHANNEL,
POSTS_IN_THREAD,
} = MM_TABLES.SERVER;
type PostActionType = keyof typeof ActionType.POSTS;
@@ -39,9 +45,17 @@ export interface PostHandlerMix {
handlePosts: ({actionType, order, posts, previousPostId, prepareRecordsOnly}: HandlePostsArgs) => Promise<Model[]>;
handlePostsInChannel: (posts: Post[]) => Promise<void>;
handlePostsInThread: (rootPosts: PostsInThread[]) => Promise<void>;
handleReceivedPostsInChannel: (posts: Post[], prepareRecordsOnly?: boolean) => Promise<PostsInChannelModel[]>;
handleReceivedPostsInChannelSince: (posts: Post[], prepareRecordsOnly?: boolean) => Promise<PostsInChannelModel[]>;
handleReceivedPostsInChannelBefore: (posts: Post[], prepareRecordsOnly?: boolean) => Promise<PostsInChannelModel[]>;
handleReceivedPostsInChannelAfter: (posts: Post[], prepareRecordsOnly?: boolean) => Promise<PostsInChannelModel[]>;
handleReceivedPostForChannel: (post: Post, prepareRecordsOnly?: boolean) => Promise<PostsInChannelModel[]>;
handleReceivedPostsInThread: (postsMap: Record<string, Post[]>, prepareRecordsOnly?: boolean) => Promise<PostsInThreadModel[]>;
}
const PostHandler = (superclass: any) => class extends superclass {
const PostHandler = <TBase extends Constructor<ServerDataOperatorBase>>(superclass: TBase) => class extends superclass {
/**
* handleDraft: Handler responsible for the Create/Update operations occurring the Draft table from the 'Server' schema
* @param {HandleDraftArgs} draftsArgs
@@ -66,7 +80,7 @@ const PostHandler = (superclass: any) => class extends superclass {
prepareRecordsOnly,
createOrUpdateRawValues,
tableName: DRAFT,
});
}, 'handleDraft');
};
/**
@@ -171,7 +185,7 @@ const PostHandler = (superclass: any) => class extends superclass {
deleteRawValues: pendingPostsToDelete,
tableName,
fieldName: 'id',
})) as ProcessRecordResults;
}));
const preparedPosts = (await this.prepareRecords({
createRaws: processedPosts.createRaws,
@@ -220,7 +234,7 @@ const PostHandler = (superclass: any) => class extends superclass {
}
if (batch.length && !prepareRecordsOnly) {
await this.batchRecords(batch);
await this.batchRecords(batch, 'handlePosts');
}
return batch;
@@ -245,7 +259,8 @@ const PostHandler = (superclass: any) => class extends superclass {
createOrUpdateRawValues: files,
tableName: FILE,
fieldName: 'id',
})) as ProcessRecordResults;
deleteRawValues: [],
}));
const postFiles = await this.prepareRecords({
createRaws: processedFiles.createRaws,
@@ -259,7 +274,7 @@ const PostHandler = (superclass: any) => class extends superclass {
}
if (postFiles?.length) {
await this.batchRecords(postFiles);
await this.batchRecords(postFiles, 'handleFiles');
}
return postFiles;
@@ -327,6 +342,340 @@ const PostHandler = (superclass: any) => class extends superclass {
return [];
};
// ========================
// POST IN CHANNEL
// ========================
_createPostsInChannelRecord = (channelId: string, earliest: number, latest: number, prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
// We should prepare instead of execute
if (prepareRecordsOnly) {
return this.prepareRecords({
tableName: POSTS_IN_CHANNEL,
createRaws: [{record: undefined, raw: {channel_id: channelId, earliest, latest}}],
transformer: transformPostsInChannelRecord,
});
}
return this.execute({
createRaws: [{record: undefined, raw: {channel_id: channelId, earliest, latest}}],
tableName: POSTS_IN_CHANNEL,
transformer: transformPostsInChannelRecord,
}, '_createPostsInChannelRecord');
};
_mergePostInChannelChunks = async (newChunk: PostsInChannelModel, existingChunks: PostsInChannelModel[], prepareRecordsOnly = false) => {
const result: PostsInChannelModel[] = [];
for (const chunk of existingChunks) {
if (newChunk.earliest <= chunk.earliest && newChunk.latest >= chunk.latest) {
if (!prepareRecordsOnly) {
newChunk.prepareUpdate(emptyFunction);
}
result.push(newChunk);
result.push(chunk.prepareDestroyPermanently());
break;
}
}
if (result.length && !prepareRecordsOnly) {
await this.batchRecords(result, '_mergePostInChannelChunks');
}
return result;
};
handleReceivedPostsInChannel = async (posts?: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
if (!posts?.length) {
logWarning(
'An empty or undefined "posts" array has been passed to the handleReceivedPostsInChannel method',
);
return [];
}
const {firstPost, lastPost} = getPostListEdges(posts);
// Channel Id for this chain of posts
const channelId = firstPost.channel_id;
// Find smallest 'create_at' value in chain
const earliest = firstPost.create_at;
// Find highest 'create_at' value in chain; -1 means we are dealing with one item in the posts array
const latest = lastPost.create_at;
// Find the records in the PostsInChannel table that have a matching channel_id
const chunks = (await this.database.get(POSTS_IN_CHANNEL).query(
Q.where('channel_id', channelId),
Q.sortBy('latest', Q.desc),
).fetch()) as PostsInChannelModel[];
// chunk length 0; then it's a new chunk to be added to the PostsInChannel table
if (chunks.length === 0) {
return this._createPostsInChannelRecord(channelId, earliest, latest, prepareRecordsOnly);
}
let targetChunk: PostsInChannelModel|undefined;
for (const chunk of chunks) {
// find if we should plug the chain before
if (firstPost.create_at >= chunk.earliest || latest <= chunk.latest) {
targetChunk = chunk;
break;
}
}
if (targetChunk) {
if (
targetChunk.earliest <= earliest &&
targetChunk.latest >= latest
) {
return [];
}
// If the chunk was found, Update the chunk and return
if (prepareRecordsOnly) {
targetChunk.prepareUpdate((record) => {
record.earliest = Math.min(record.earliest, earliest);
record.latest = Math.max(record.latest, latest);
});
return [targetChunk];
}
targetChunk = await this.database.write(async () => {
return targetChunk!.update((record) => {
record.earliest = Math.min(record.earliest, earliest);
record.latest = Math.max(record.latest, latest);
});
});
return [targetChunk!];
}
// Create a new chunk and merge them if needed
const newChunk = await this._createPostsInChannelRecord(channelId, earliest, latest, prepareRecordsOnly);
const merged = await this._mergePostInChannelChunks(newChunk[0], chunks, prepareRecordsOnly);
return merged;
};
handleReceivedPostsInChannelSince = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
if (!posts?.length) {
logWarning(
'An empty or undefined "posts" array has been passed to the handleReceivedPostsInChannelSince method',
);
return [];
}
const {firstPost} = getPostListEdges(posts);
let latest = 0;
let recentChunk: PostsInChannelModel|undefined;
const chunks = (await this.database.get(POSTS_IN_CHANNEL).query(
Q.where('channel_id', firstPost.channel_id),
Q.sortBy('latest', Q.desc),
).fetch()) as PostsInChannelModel[];
if (chunks.length) {
recentChunk = chunks[0];
// add any new recent post while skipping the ones that were just updated
for (const post of posts) {
if (post.create_at > recentChunk.latest) {
latest = post.create_at;
}
}
}
if (recentChunk && recentChunk.latest < latest) {
// We've got new posts that belong to this chunk
if (prepareRecordsOnly) {
recentChunk.prepareUpdate((record) => {
record.latest = Math.max(record.latest, latest);
});
return [recentChunk];
}
recentChunk = await this.database.write(async () => {
return recentChunk!.update((record) => {
record.latest = Math.max(record.latest, latest);
});
});
return [recentChunk!];
}
return [];
};
handleReceivedPostsInChannelBefore = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
if (!posts?.length) {
logWarning(
'An empty or undefined "posts" array has been passed to the handleReceivedPostsInChannelBefore method',
);
return [];
}
const {firstPost} = getPostListEdges(posts);
// Channel Id for this chain of posts
const channelId = firstPost.channel_id;
// Find smallest 'create_at' value in chain
const earliest = firstPost.create_at;
// Find the records in the PostsInChannel table that have a matching channel_id
const chunks = (await this.database.get(POSTS_IN_CHANNEL).query(
Q.where('channel_id', channelId),
Q.sortBy('latest', Q.desc),
).fetch()) as PostsInChannelModel[];
if (chunks.length === 0) {
// No chunks found, previous posts in this block not found
return [];
}
let targetChunk = chunks[0];
if (targetChunk) {
if (targetChunk.earliest <= earliest) {
return [];
}
// If the chunk was found, Update the chunk and return
if (prepareRecordsOnly) {
targetChunk.prepareUpdate((record) => {
record.earliest = Math.min(record.earliest, earliest);
});
return [targetChunk];
}
targetChunk = await this.database.write(async () => {
return targetChunk!.update((record) => {
record.earliest = Math.min(record.earliest, earliest);
});
});
return [targetChunk!];
}
return targetChunk;
};
handleReceivedPostsInChannelAfter = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
throw new Error(`handleReceivedPostsInChannelAfter Not implemented yet. posts count${posts.length} prepareRecordsOnly=${prepareRecordsOnly}`);
};
handleReceivedPostForChannel = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
if (!posts?.length) {
logWarning(
'An empty or undefined "posts" array has been passed to the handleReceivedPostForChannel method',
);
return [];
}
const {firstPost, lastPost} = getPostListEdges(posts);
// Channel Id for this chain of posts
const channelId = firstPost.channel_id;
// Find smallest 'create_at' value in chain
const earliest = firstPost.create_at;
// Find highest 'create_at' value in chain; -1 means we are dealing with one item in the posts array
const latest = lastPost.create_at;
// Find the records in the PostsInChannel table that have a matching channel_id
const chunks = (await this.database.get(POSTS_IN_CHANNEL).query(
Q.where('channel_id', channelId),
Q.sortBy('latest', Q.desc),
).fetch()) as PostsInChannelModel[];
// chunk length 0; then it's a new chunk to be added to the PostsInChannel table
if (chunks.length === 0) {
return this._createPostsInChannelRecord(channelId, earliest, latest, prepareRecordsOnly);
}
let targetChunk = chunks[0];
if (targetChunk) {
if (targetChunk.latest >= latest) {
return [];
}
// If the chunk was found, Update the chunk and return
if (prepareRecordsOnly) {
targetChunk.prepareUpdate((record) => {
record.latest = Math.max(record.latest, latest);
});
return [targetChunk];
}
targetChunk = await this.database.write(async () => {
return targetChunk!.update((record) => {
record.latest = Math.max(record.latest, latest);
});
});
return [targetChunk!];
}
return targetChunk;
};
// ========================
// POST IN THREAD
// ========================
handleReceivedPostsInThread = async (postsMap: Record<string, Post[]>, prepareRecordsOnly = false): Promise<PostsInThreadModel[]> => {
if (!postsMap || !Object.keys(postsMap).length) {
logWarning(
'An empty or undefined "postsMap" object has been passed to the handleReceivedPostsInThread method',
);
return [];
}
const update: RecordPair[] = [];
const create: PostsInThread[] = [];
const ids = Object.keys(postsMap);
for await (const rootId of ids) {
const {firstPost, lastPost} = getPostListEdges(postsMap[rootId]);
const chunks = (await this.database.get(POSTS_IN_THREAD).query(
Q.where('root_id', rootId),
Q.sortBy('latest', Q.desc),
).fetch()) as PostsInThreadModel[];
if (chunks.length) {
const chunk = chunks[0];
const newValue = {
root_id: rootId,
earliest: Math.min(chunk.earliest, firstPost.create_at),
latest: Math.max(chunk.latest, lastPost.create_at),
};
update.push(getValidRecordsForUpdate({
tableName: POSTS_IN_THREAD,
newValue,
existingRecord: chunk,
}));
} else {
// create chunk
create.push({
root_id: rootId,
earliest: firstPost.create_at,
latest: lastPost.create_at,
});
}
}
const postInThreadRecords = (await this.prepareRecords({
createRaws: getRawRecordPairs(create),
updateRaws: update,
transformer: transformPostInThreadRecord,
tableName: POSTS_IN_THREAD,
})) as PostsInThreadModel[];
if (postInThreadRecords?.length && !prepareRecordsOnly) {
await this.batchRecords(postInThreadRecords, 'handleReceivedPostsInThread');
}
return postInThreadRecords;
};
};
export default PostHandler;

View File

@@ -1,298 +0,0 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
import {Q} from '@nozbe/watermelondb';
import {Database} from '@constants';
import {getPostListEdges} from '@database//operator/utils/post';
import {transformPostsInChannelRecord} from '@database/operator/server_data_operator/transformers/post';
import {emptyFunction} from '@utils/general';
import {logWarning} from '@utils/log';
import type PostsInChannelModel from '@typings/database/models/servers/posts_in_channel';
export interface PostsInChannelHandlerMix {
handleReceivedPostsInChannel: (posts: Post[], prepareRecordsOnly?: boolean) => Promise<PostsInChannelModel[]>;
handleReceivedPostsInChannelSince: (posts: Post[], prepareRecordsOnly?: boolean) => Promise<PostsInChannelModel[]>;
handleReceivedPostsInChannelBefore: (posts: Post[], prepareRecordsOnly?: boolean) => Promise<PostsInChannelModel[]>;
handleReceivedPostsInChannelAfter: (posts: Post[], prepareRecordsOnly?: boolean) => Promise<PostsInChannelModel[]>;
handleReceivedPostForChannel: (post: Post, prepareRecordsOnly?: boolean) => Promise<PostsInChannelModel[]>;
}
const {POSTS_IN_CHANNEL} = Database.MM_TABLES.SERVER;
const PostsInChannelHandler = (superclass: any) => class extends superclass {
_createPostsInChannelRecord = (channelId: string, earliest: number, latest: number, prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
// We should prepare instead of execute
if (prepareRecordsOnly) {
return this.prepareRecords({
tableName: POSTS_IN_CHANNEL,
createRaws: [{record: undefined, raw: {channel_id: channelId, earliest, latest}}],
transformer: transformPostsInChannelRecord,
});
}
return this.execute({
createRaws: [{record: undefined, raw: {channel_id: channelId, earliest, latest}}],
tableName: POSTS_IN_CHANNEL,
transformer: transformPostsInChannelRecord,
});
};
_mergePostInChannelChunks = async (newChunk: PostsInChannelModel, existingChunks: PostsInChannelModel[], prepareRecordsOnly = false) => {
const result: PostsInChannelModel[] = [];
for (const chunk of existingChunks) {
if (newChunk.earliest <= chunk.earliest && newChunk.latest >= chunk.latest) {
if (!prepareRecordsOnly) {
newChunk.prepareUpdate(emptyFunction);
}
result.push(newChunk);
result.push(chunk.prepareDestroyPermanently());
break;
}
}
if (result.length && !prepareRecordsOnly) {
await this.batchRecords(result);
}
return result;
};
handleReceivedPostsInChannel = async (posts?: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
if (!posts?.length) {
logWarning(
'An empty or undefined "posts" array has been passed to the handleReceivedPostsInChannel method',
);
return [];
}
const {firstPost, lastPost} = getPostListEdges(posts);
// Channel Id for this chain of posts
const channelId = firstPost.channel_id;
// Find smallest 'create_at' value in chain
const earliest = firstPost.create_at;
// Find highest 'create_at' value in chain; -1 means we are dealing with one item in the posts array
const latest = lastPost.create_at;
// Find the records in the PostsInChannel table that have a matching channel_id
const chunks = (await this.database.get(POSTS_IN_CHANNEL).query(
Q.where('channel_id', channelId),
Q.sortBy('latest', Q.desc),
).fetch()) as PostsInChannelModel[];
// chunk length 0; then it's a new chunk to be added to the PostsInChannel table
if (chunks.length === 0) {
return this._createPostsInChannelRecord(channelId, earliest, latest, prepareRecordsOnly);
}
let targetChunk: PostsInChannelModel|undefined;
for (const chunk of chunks) {
// find if we should plug the chain before
if (firstPost.create_at >= chunk.earliest || latest <= chunk.latest) {
targetChunk = chunk;
break;
}
}
if (targetChunk) {
if (
targetChunk.earliest <= earliest &&
targetChunk.latest >= latest
) {
return [];
}
// If the chunk was found, Update the chunk and return
if (prepareRecordsOnly) {
targetChunk.prepareUpdate((record) => {
record.earliest = Math.min(record.earliest, earliest);
record.latest = Math.max(record.latest, latest);
});
return [targetChunk];
}
targetChunk = await this.database.write(async () => {
return targetChunk!.update((record) => {
record.earliest = Math.min(record.earliest, earliest);
record.latest = Math.max(record.latest, latest);
});
});
return [targetChunk!];
}
// Create a new chunk and merge them if needed
const newChunk = await this._createPostsInChannelRecord(channelId, earliest, latest, prepareRecordsOnly);
const merged = await this._mergePostInChannelChunks(newChunk[0], chunks, prepareRecordsOnly);
return merged;
};
handleReceivedPostsInChannelSince = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
if (!posts?.length) {
logWarning(
'An empty or undefined "posts" array has been passed to the handleReceivedPostsInChannelSince method',
);
return [];
}
const {firstPost} = getPostListEdges(posts);
let latest = 0;
let recentChunk: PostsInChannelModel|undefined;
const chunks = (await this.database.get(POSTS_IN_CHANNEL).query(
Q.where('channel_id', firstPost.channel_id),
Q.sortBy('latest', Q.desc),
).fetch()) as PostsInChannelModel[];
if (chunks.length) {
recentChunk = chunks[0];
// add any new recent post while skipping the ones that were just updated
for (const post of posts) {
if (post.create_at > recentChunk.latest) {
latest = post.create_at;
}
}
}
if (recentChunk && recentChunk.latest < latest) {
// We've got new posts that belong to this chunk
if (prepareRecordsOnly) {
recentChunk.prepareUpdate((record) => {
record.latest = Math.max(record.latest, latest);
});
return [recentChunk];
}
recentChunk = await this.database.write(async () => {
return recentChunk!.update((record) => {
record.latest = Math.max(record.latest, latest);
});
});
return [recentChunk!];
}
return [];
};
handleReceivedPostsInChannelBefore = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
if (!posts?.length) {
logWarning(
'An empty or undefined "posts" array has been passed to the handleReceivedPostsInChannelBefore method',
);
return [];
}
const {firstPost} = getPostListEdges(posts);
// Channel Id for this chain of posts
const channelId = firstPost.channel_id;
// Find smallest 'create_at' value in chain
const earliest = firstPost.create_at;
// Find the records in the PostsInChannel table that have a matching channel_id
const chunks = (await this.database.get(POSTS_IN_CHANNEL).query(
Q.where('channel_id', channelId),
Q.sortBy('latest', Q.desc),
).fetch()) as PostsInChannelModel[];
if (chunks.length === 0) {
// No chunks found, previous posts in this block not found
return [];
}
let targetChunk = chunks[0];
if (targetChunk) {
if (targetChunk.earliest <= earliest) {
return [];
}
// If the chunk was found, Update the chunk and return
if (prepareRecordsOnly) {
targetChunk.prepareUpdate((record) => {
record.earliest = Math.min(record.earliest, earliest);
});
return [targetChunk];
}
targetChunk = await this.database.write(async () => {
return targetChunk!.update((record) => {
record.earliest = Math.min(record.earliest, earliest);
});
});
return [targetChunk!];
}
return targetChunk;
};
handleReceivedPostsInChannelAfter = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
throw new Error(`handleReceivedPostsInChannelAfter Not implemented yet. posts count${posts.length} prepareRecordsOnly=${prepareRecordsOnly}`);
};
handleReceivedPostForChannel = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
if (!posts?.length) {
logWarning(
'An empty or undefined "posts" array has been passed to the handleReceivedPostForChannel method',
);
return [];
}
const {firstPost, lastPost} = getPostListEdges(posts);
// Channel Id for this chain of posts
const channelId = firstPost.channel_id;
// Find smallest 'create_at' value in chain
const earliest = firstPost.create_at;
// Find highest 'create_at' value in chain; -1 means we are dealing with one item in the posts array
const latest = lastPost.create_at;
// Find the records in the PostsInChannel table that have a matching channel_id
const chunks = (await this.database.get(POSTS_IN_CHANNEL).query(
Q.where('channel_id', channelId),
Q.sortBy('latest', Q.desc),
).fetch()) as PostsInChannelModel[];
// chunk length 0; then it's a new chunk to be added to the PostsInChannel table
if (chunks.length === 0) {
return this._createPostsInChannelRecord(channelId, earliest, latest, prepareRecordsOnly);
}
let targetChunk = chunks[0];
if (targetChunk) {
if (targetChunk.latest >= latest) {
return [];
}
// If the chunk was found, Update the chunk and return
if (prepareRecordsOnly) {
targetChunk.prepareUpdate((record) => {
record.latest = Math.max(record.latest, latest);
});
return [targetChunk];
}
targetChunk = await this.database.write(async () => {
return targetChunk!.update((record) => {
record.latest = Math.max(record.latest, latest);
});
});
return [targetChunk!];
}
return targetChunk;
};
};
export default PostsInChannelHandler;

View File

@@ -1,77 +0,0 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
import {Q} from '@nozbe/watermelondb';
import {Database} from '@constants';
import {getPostListEdges} from '@database//operator/utils/post';
import {transformPostInThreadRecord} from '@database/operator/server_data_operator/transformers/post';
import {getRawRecordPairs, getValidRecordsForUpdate} from '@database/operator/utils/general';
import {logWarning} from '@utils/log';
import type {RecordPair} from '@typings/database/database';
import type PostsInThreadModel from '@typings/database/models/servers/posts_in_thread';
export interface PostsInThreadHandlerMix {
handleReceivedPostsInThread: (postsMap: Record<string, Post[]>, prepareRecordsOnly?: boolean) => Promise<PostsInThreadModel[]>;
}
const {POSTS_IN_THREAD} = Database.MM_TABLES.SERVER;
const PostsInThreadHandler = (superclass: any) => class extends superclass {
handleReceivedPostsInThread = async (postsMap: Record<string, Post[]>, prepareRecordsOnly = false): Promise<PostsInThreadModel[]> => {
if (!postsMap || !Object.keys(postsMap).length) {
logWarning(
'An empty or undefined "postsMap" object has been passed to the handleReceivedPostsInThread method',
);
return [];
}
const update: RecordPair[] = [];
const create: PostsInThread[] = [];
const ids = Object.keys(postsMap);
for await (const rootId of ids) {
const {firstPost, lastPost} = getPostListEdges(postsMap[rootId]);
const chunks = (await this.database.get(POSTS_IN_THREAD).query(
Q.where('root_id', rootId),
Q.sortBy('latest', Q.desc),
).fetch()) as PostsInThreadModel[];
if (chunks.length) {
const chunk = chunks[0];
const newValue = {
root_id: rootId,
earliest: Math.min(chunk.earliest, firstPost.create_at),
latest: Math.max(chunk.latest, lastPost.create_at),
};
update.push(getValidRecordsForUpdate({
tableName: POSTS_IN_THREAD,
newValue,
existingRecord: chunk,
}));
} else {
// create chunk
create.push({
root_id: rootId,
earliest: firstPost.create_at,
latest: lastPost.create_at,
});
}
}
const postInThreadRecords = (await this.prepareRecords({
createRaws: getRawRecordPairs(create),
updateRaws: update,
transformer: transformPostInThreadRecord,
tableName: POSTS_IN_THREAD,
})) as PostsInThreadModel[];
if (postInThreadRecords?.length && !prepareRecordsOnly) {
await this.batchRecords(postInThreadRecords);
}
return postInThreadRecords;
};
};
export default PostsInThreadHandler;

View File

@@ -1,78 +0,0 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
import {MM_TABLES} from '@constants/database';
import {transformReactionRecord} from '@database/operator/server_data_operator/transformers/reaction';
import {sanitizeReactions} from '@database/operator/utils/reaction';
import {logWarning} from '@utils/log';
import type {HandleReactionsArgs} from '@typings/database/database';
import type CustomEmojiModel from '@typings/database/models/servers/custom_emoji';
import type ReactionModel from '@typings/database/models/servers/reaction';
const {REACTION} = MM_TABLES.SERVER;
export interface ReactionHandlerMix {
handleReactions: ({postsReactions, prepareRecordsOnly}: HandleReactionsArgs) => Promise<Array<ReactionModel | CustomEmojiModel>>;
}
const ReactionHandler = (superclass: any) => class extends superclass {
/**
* handleReactions: Handler responsible for the Create/Update operations occurring on the Reaction table from the 'Server' schema
* @param {HandleReactionsArgs} handleReactions
* @param {ReactionsPerPost[]} handleReactions.postsReactions
* @param {boolean} handleReactions.prepareRecordsOnly
* @param {boolean} handleReactions.skipSync
* @returns {Promise<Array<(ReactionModel | CustomEmojiModel)>>}
*/
handleReactions = async ({postsReactions, prepareRecordsOnly, skipSync}: HandleReactionsArgs): Promise<ReactionModel[]> => {
const batchRecords: ReactionModel[] = [];
if (!postsReactions?.length) {
logWarning(
'An empty or undefined "postsReactions" array has been passed to the handleReactions method',
);
return [];
}
for await (const postReactions of postsReactions) {
const {post_id, reactions} = postReactions;
const {
createReactions,
deleteReactions,
} = await sanitizeReactions({
database: this.database,
post_id,
rawReactions: reactions,
skipSync,
});
if (createReactions?.length) {
// Prepares record for model Reactions
const reactionsRecords = (await this.prepareRecords({
createRaws: createReactions,
transformer: transformReactionRecord,
tableName: REACTION,
})) as ReactionModel[];
batchRecords.push(...reactionsRecords);
}
if (deleteReactions?.length && !skipSync) {
deleteReactions.forEach((outCast) => outCast.prepareDestroyPermanently());
batchRecords.push(...deleteReactions);
}
}
if (prepareRecordsOnly) {
return batchRecords;
}
if (batchRecords?.length) {
await this.batchRecords(batchRecords);
}
return batchRecords;
};
};
export default ReactionHandler;

View File

@@ -60,7 +60,7 @@ describe('*** Operator: Team Handlers tests ***', () => {
tableName: 'Team',
prepareRecordsOnly: false,
transformer: transformTeamRecord,
});
}, 'handleTeam');
});
it('=> HandleTeamMemberships: should write to the TEAM_MEMBERSHIP table', async () => {
@@ -98,7 +98,7 @@ describe('*** Operator: Team Handlers tests ***', () => {
prepareRecordsOnly: false,
buildKeyRecordBy: buildTeamMembershipKey,
transformer: transformTeamMembershipRecord,
});
}, 'handleTeamMemberships');
});
it('=> HandleMyTeam: should write to the MY_TEAM table', async () => {
@@ -124,7 +124,7 @@ describe('*** Operator: Team Handlers tests ***', () => {
tableName: 'MyTeam',
prepareRecordsOnly: false,
transformer: transformMyTeamRecord,
});
}, 'handleMyTeam');
});
it('=> HandleTeamChannelHistory: should write to the TEAM_CHANNEL_HISTORY table', async () => {
@@ -150,7 +150,7 @@ describe('*** Operator: Team Handlers tests ***', () => {
tableName: 'TeamChannelHistory',
prepareRecordsOnly: false,
transformer: transformTeamChannelHistoryRecord,
});
}, 'handleTeamChannelHistory');
});
it('=> HandleTeamSearchHistory: should write to the TEAM_SEARCH_HISTORY table', async () => {
@@ -179,6 +179,6 @@ describe('*** Operator: Team Handlers tests ***', () => {
prepareRecordsOnly: false,
buildKeyRecordBy: buildTeamSearchHistoryKey,
transformer: transformTeamSearchHistoryRecord,
});
}, 'handleTeamSearchHistory');
});
});

View File

@@ -18,6 +18,7 @@ import {
import {getUniqueRawsBy} from '@database/operator/utils/general';
import {logWarning} from '@utils/log';
import type ServerDataOperatorBase from '.';
import type {
HandleMyTeamArgs, HandleTeamArgs,
HandleTeamChannelHistoryArgs, HandleTeamMembershipArgs, HandleTeamSearchHistoryArgs,
@@ -44,7 +45,7 @@ export interface TeamHandlerMix {
handleMyTeam: ({myTeams, prepareRecordsOnly}: HandleMyTeamArgs) => Promise<MyTeamModel[]>;
}
const TeamHandler = (superclass: any) => class extends superclass {
const TeamHandler = <TBase extends Constructor<ServerDataOperatorBase>>(superclass: TBase) => class extends superclass {
/**
* handleTeamMemberships: Handler responsible for the Create/Update operations occurring on the TEAM_MEMBERSHIP table from the 'Server' schema
* @param {HandleTeamMembershipArgs} teamMembershipsArgs
@@ -97,7 +98,7 @@ const TeamHandler = (superclass: any) => class extends superclass {
createOrUpdateRawValues,
tableName: TEAM_MEMBERSHIP,
prepareRecordsOnly,
});
}, 'handleTeamMemberships');
};
/**
@@ -146,7 +147,7 @@ const TeamHandler = (superclass: any) => class extends superclass {
prepareRecordsOnly,
createOrUpdateRawValues,
tableName: TEAM,
});
}, 'handleTeam');
};
/**
@@ -172,7 +173,7 @@ const TeamHandler = (superclass: any) => class extends superclass {
prepareRecordsOnly,
createOrUpdateRawValues,
tableName: TEAM_CHANNEL_HISTORY,
});
}, 'handleTeamChannelHistory');
};
/**
@@ -199,7 +200,7 @@ const TeamHandler = (superclass: any) => class extends superclass {
prepareRecordsOnly,
createOrUpdateRawValues,
tableName: TEAM_SEARCH_HISTORY,
});
}, 'handleTeamSearchHistory');
};
/**
@@ -248,7 +249,7 @@ const TeamHandler = (superclass: any) => class extends superclass {
prepareRecordsOnly,
createOrUpdateRawValues,
tableName: MY_TEAM,
});
}, 'handleMyTeam');
};
};

View File

@@ -8,6 +8,7 @@ import {transformTeamThreadsSyncRecord} from '@database/operator/server_data_ope
import {getRawRecordPairs, getUniqueRawsBy, getValidRecordsForUpdate} from '@database/operator/utils/general';
import {logWarning} from '@utils/log';
import type ServerDataOperatorBase from '.';
import type {HandleTeamThreadsSyncArgs, RecordPair} from '@typings/database/database';
import type TeamThreadsSyncModel from '@typings/database/models/servers/team_threads_sync';
@@ -17,7 +18,7 @@ export interface TeamThreadsSyncHandlerMix {
const {TEAM_THREADS_SYNC} = MM_TABLES.SERVER;
const TeamThreadsSyncHandler = (superclass: any) => class extends superclass {
const TeamThreadsSyncHandler = <TBase extends Constructor<ServerDataOperatorBase>>(superclass: TBase) => class extends superclass {
handleTeamThreadsSync = async ({data, prepareRecordsOnly = false}: HandleTeamThreadsSyncArgs): Promise<TeamThreadsSyncModel[]> => {
if (!data || !data.length) {
logWarning(
@@ -61,7 +62,7 @@ const TeamThreadsSyncHandler = (superclass: any) => class extends superclass {
})) as TeamThreadsSyncModel[];
if (models?.length && !prepareRecordsOnly) {
await this.batchRecords(models);
await this.batchRecords(models, 'handleTeamThreadsSync');
}
return models;

View File

@@ -59,7 +59,7 @@ describe('*** Operator: Thread Handlers tests ***', () => {
createOrUpdateRawValues: threads,
tableName: 'Thread',
prepareRecordsOnly: true,
});
}, 'handleThreads(NEVER)');
// Should handle participants
expect(spyOnHandleThreadParticipants).toHaveBeenCalledWith({

View File

@@ -7,14 +7,16 @@ import {MM_TABLES} from '@constants/database';
import {
transformThreadRecord,
transformThreadParticipantRecord,
transformThreadInTeamRecord,
} from '@database/operator/server_data_operator/transformers/thread';
import {getUniqueRawsBy} from '@database/operator/utils/general';
import {getRawRecordPairs, getUniqueRawsBy} from '@database/operator/utils/general';
import {sanitizeThreadParticipants} from '@database/operator/utils/thread';
import {logWarning} from '@utils/log';
import type ServerDataOperatorBase from '.';
import type Database from '@nozbe/watermelondb/Database';
import type Model from '@nozbe/watermelondb/Model';
import type {HandleThreadsArgs, HandleThreadParticipantsArgs} from '@typings/database/database';
import type {HandleThreadsArgs, HandleThreadParticipantsArgs, HandleThreadInTeamArgs} from '@typings/database/database';
import type ThreadModel from '@typings/database/models/servers/thread';
import type ThreadInTeamModel from '@typings/database/models/servers/thread_in_team';
import type ThreadParticipantModel from '@typings/database/models/servers/thread_participant';
@@ -22,14 +24,16 @@ import type ThreadParticipantModel from '@typings/database/models/servers/thread
const {
THREAD,
THREAD_PARTICIPANT,
THREADS_IN_TEAM,
} = MM_TABLES.SERVER;
export interface ThreadHandlerMix {
handleThreads: ({threads, teamId, prepareRecordsOnly}: HandleThreadsArgs) => Promise<Model[]>;
handleThreadParticipants: ({threadsParticipants, prepareRecordsOnly}: HandleThreadParticipantsArgs) => Promise<ThreadParticipantModel[]>;
handleThreadInTeam: ({threadsMap, prepareRecordsOnly}: HandleThreadInTeamArgs) => Promise<ThreadInTeamModel[]>;
}
const ThreadHandler = (superclass: any) => class extends superclass {
const ThreadHandler = <TBase extends Constructor<ServerDataOperatorBase>>(superclass: TBase) => class extends superclass {
/**
* handleThreads: Handler responsible for the Create/Update operations occurring on the Thread table from the 'Server' schema
* @param {HandleThreadsArgs} handleThreads
@@ -49,11 +53,11 @@ const ThreadHandler = (superclass: any) => class extends superclass {
const uniqueThreads = getUniqueRawsBy({
raws: threads,
key: 'id',
}) as Thread[];
}) as ThreadWithLastFetchedAt[];
// Seperate threads to be deleted & created/updated
const deletedThreadIds: string[] = [];
const createOrUpdateThreads: Thread[] = [];
const createOrUpdateThreads: ThreadWithLastFetchedAt[] = [];
uniqueThreads.forEach((thread) => {
if (thread.delete_at > 0) {
deletedThreadIds.push(thread.id);
@@ -106,7 +110,7 @@ const ThreadHandler = (superclass: any) => class extends superclass {
prepareRecordsOnly: true,
createOrUpdateRawValues: createOrUpdateThreads,
tableName: THREAD,
}) as ThreadModel[];
}, 'handleThreads(NEVER)');
// Add the models to be batched here
const batch: Model[] = [...preparedThreads];
@@ -124,7 +128,7 @@ const ThreadHandler = (superclass: any) => class extends superclass {
}
if (batch.length && !prepareRecordsOnly) {
await this.batchRecords(batch);
await this.batchRecords(batch, 'handleThreads');
}
return batch;
@@ -175,11 +179,58 @@ const ThreadHandler = (superclass: any) => class extends superclass {
}
if (batchRecords?.length) {
await this.batchRecords(batchRecords);
await this.batchRecords(batchRecords, 'handleThreadParticipants');
}
return batchRecords;
};
handleThreadInTeam = async ({threadsMap, prepareRecordsOnly = false}: HandleThreadInTeamArgs): Promise<ThreadInTeamModel[]> => {
if (!threadsMap || !Object.keys(threadsMap).length) {
logWarning(
'An empty or undefined "threadsMap" object has been passed to the handleReceivedPostForChannel method',
);
return [];
}
const create: ThreadInTeam[] = [];
const teamIds = Object.keys(threadsMap);
for await (const teamId of teamIds) {
const threadIds = threadsMap[teamId].map((thread) => thread.id);
const chunks = await (this.database as Database).get<ThreadInTeamModel>(THREADS_IN_TEAM).query(
Q.where('team_id', teamId),
Q.where('id', Q.oneOf(threadIds)),
).fetch();
const chunksMap = chunks.reduce((result: Record<string, ThreadInTeamModel>, chunk) => {
result[chunk.threadId] = chunk;
return result;
}, {});
for (const thread of threadsMap[teamId]) {
const chunk = chunksMap[thread.id];
// Create if the chunk is not found
if (!chunk) {
create.push({
thread_id: thread.id,
team_id: teamId,
});
}
}
}
const threadsInTeam = (await this.prepareRecords({
createRaws: getRawRecordPairs(create),
transformer: transformThreadInTeamRecord,
tableName: THREADS_IN_TEAM,
})) as ThreadInTeamModel[];
if (threadsInTeam?.length && !prepareRecordsOnly) {
await this.batchRecords(threadsInTeam, 'handleThreadInTeam');
}
return threadsInTeam;
};
};
export default ThreadHandler;

View File

@@ -1,69 +0,0 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
import {Q, Database} from '@nozbe/watermelondb';
import {MM_TABLES} from '@constants/database';
import {transformThreadInTeamRecord} from '@database/operator/server_data_operator/transformers/thread';
import {getRawRecordPairs} from '@database/operator/utils/general';
import {logWarning} from '@utils/log';
import type {HandleThreadInTeamArgs} from '@typings/database/database';
import type ThreadInTeamModel from '@typings/database/models/servers/thread_in_team';
export interface ThreadInTeamHandlerMix {
handleThreadInTeam: ({threadsMap, prepareRecordsOnly}: HandleThreadInTeamArgs) => Promise<ThreadInTeamModel[]>;
}
const {THREADS_IN_TEAM} = MM_TABLES.SERVER;
const ThreadInTeamHandler = (superclass: any) => class extends superclass {
handleThreadInTeam = async ({threadsMap, prepareRecordsOnly = false}: HandleThreadInTeamArgs): Promise<ThreadInTeamModel[]> => {
if (!threadsMap || !Object.keys(threadsMap).length) {
logWarning(
'An empty or undefined "threadsMap" object has been passed to the handleReceivedPostForChannel method',
);
return [];
}
const create: ThreadInTeam[] = [];
const teamIds = Object.keys(threadsMap);
for await (const teamId of teamIds) {
const threadIds = threadsMap[teamId].map((thread) => thread.id);
const chunks = await (this.database as Database).get<ThreadInTeamModel>(THREADS_IN_TEAM).query(
Q.where('team_id', teamId),
Q.where('id', Q.oneOf(threadIds)),
).fetch();
const chunksMap = chunks.reduce((result: Record<string, ThreadInTeamModel>, chunk) => {
result[chunk.threadId] = chunk;
return result;
}, {});
for (const thread of threadsMap[teamId]) {
const chunk = chunksMap[thread.id];
// Create if the chunk is not found
if (!chunk) {
create.push({
thread_id: thread.id,
team_id: teamId,
});
}
}
}
const threadsInTeam = (await this.prepareRecords({
createRaws: getRawRecordPairs(create),
transformer: transformThreadInTeamRecord,
tableName: THREADS_IN_TEAM,
})) as ThreadInTeamModel[];
if (threadsInTeam?.length && !prepareRecordsOnly) {
await this.batchRecords(threadsInTeam);
}
return threadsInTeam;
};
};
export default ThreadInTeamHandler;

View File

@@ -102,7 +102,7 @@ describe('*** Operator: User Handlers tests ***', () => {
tableName: 'User',
prepareRecordsOnly: false,
transformer: transformUserRecord,
});
}, 'handleUsers');
});
it('=> HandlePreferences: should write to the PREFERENCE table', async () => {
@@ -151,6 +151,6 @@ describe('*** Operator: User Handlers tests ***', () => {
prepareRecordsOnly: true,
buildKeyRecordBy: buildPreferenceKey,
transformer: transformPreferenceRecord,
});
}, 'handlePreferences(NEVER)');
});
});

View File

@@ -11,6 +11,7 @@ import {getUniqueRawsBy} from '@database/operator/utils/general';
import {filterPreferences} from '@helpers/api/preference';
import {logWarning} from '@utils/log';
import type ServerDataOperatorBase from '.';
import type {
HandlePreferencesArgs,
HandleUsersArgs,
@@ -25,7 +26,7 @@ export interface UserHandlerMix {
handleUsers: ({users, prepareRecordsOnly}: HandleUsersArgs) => Promise<UserModel[]>;
}
const UserHandler = (superclass: any) => class extends superclass {
const UserHandler = <TBase extends Constructor<ServerDataOperatorBase>>(superclass: TBase) => class extends superclass {
/**
* handlePreferences: Handler responsible for the Create/Update operations occurring on the PREFERENCE table from the 'Server' schema
* @param {HandlePreferencesArgs} preferencesArgs
@@ -87,7 +88,7 @@ const UserHandler = (superclass: any) => class extends superclass {
prepareRecordsOnly: true,
createOrUpdateRawValues,
tableName: PREFERENCE,
});
}, 'handlePreferences(NEVER)');
records.push(...createOrUpdate);
}
@@ -96,7 +97,7 @@ const UserHandler = (superclass: any) => class extends superclass {
}
if (records.length && !prepareRecordsOnly) {
await this.batchRecords(records);
await this.batchRecords(records, 'handlePreferences');
}
return records;
@@ -125,7 +126,7 @@ const UserHandler = (superclass: any) => class extends superclass {
createOrUpdateRawValues,
tableName: USER,
prepareRecordsOnly,
});
}, 'handleUsers');
};
};