Improve post chunk handling (#7309)

* Improve post chunk handling

* Remove unneeded merge and improve post list handling

* Fix unneeded requests

* Address feedback and fix lint
This commit is contained in:
Daniel Espino García
2023-04-27 16:04:46 +02:00
committed by GitHub
parent ab4f65020a
commit 30706382a0
7 changed files with 376 additions and 220 deletions

View File

@@ -1,15 +1,19 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
import {Q} from '@nozbe/watermelondb';
import {Database, Q} from '@nozbe/watermelondb';
import {ActionType} from '@constants';
import {OperationType} from '@constants/database';
import DatabaseManager from '@database/manager';
import {buildDraftKey} from '@database/operator/server_data_operator/comparators';
import {transformDraftRecord} from '@database/operator/server_data_operator/transformers/post';
import {transformDraftRecord, transformPostsInChannelRecord} from '@database/operator/server_data_operator/transformers/post';
import {createPostsChain} from '@database/operator/utils/post';
import {exportedForTest} from './post';
import type ServerDataOperator from '..';
import type PostsInChannelModel from '@typings/database/models/servers/posts_in_channel';
Q.sortBy = jest.fn().mockImplementation((field) => {
return Q.where(field, Q.gte(0));
@@ -306,3 +310,100 @@ describe('*** Operator: Post Handlers tests ***', () => {
expect(spyOnHandlePostsInChannel).toHaveBeenCalledWith(linkedPosts.slice(0, 3), actionType, true);
});
});
describe('*** Operator: merge chunks ***', () => {
const {mergePostInChannelChunks} = exportedForTest;
let database: Database;
let operator: ServerDataOperator;
const databaseName = 'baseHandler.test.com';
const channelId = '1234';
beforeEach(async () => {
await DatabaseManager.init([databaseName]);
const serverDatabase = DatabaseManager.serverDatabases[databaseName]!;
database = serverDatabase.database;
operator = serverDatabase.operator;
});
afterEach(async () => {
await DatabaseManager.destroyServerDatabase(databaseName);
});
it('merge on empty chunks', async () => {
const newChunk = await transformPostsInChannelRecord({action: OperationType.CREATE, database, value: {record: undefined, raw: {channel_id: channelId, earliest: 0, latest: 100}}});
const chunks: PostsInChannelModel[] = [];
const result = await mergePostInChannelChunks(newChunk, chunks);
expect(result.length).toBe(0);
expect(newChunk.earliest).toBe(0);
expect(newChunk.latest).toBe(100);
});
it('remove contained chunks', async () => {
const newChunk = await transformPostsInChannelRecord({action: OperationType.CREATE, database, value: {record: undefined, raw: {channel_id: channelId, earliest: 0, latest: 100}}});
const chunks: PostsInChannelModel[] = [
await transformPostsInChannelRecord({action: OperationType.CREATE, database, value: {record: undefined, raw: {channel_id: channelId, earliest: 20, latest: 80}}}),
];
const result = await mergePostInChannelChunks(newChunk, chunks);
expect(result.length).toBe(1);
expect(newChunk.earliest).toBe(0);
expect(newChunk.latest).toBe(100);
expect(result[0]).toBe(chunks[0]);
expect(chunks[0]._preparedState).toBe('destroyPermanently');
});
it('merge intersecting chunks', async () => {
const newChunk = await transformPostsInChannelRecord({action: OperationType.CREATE, database, value: {record: undefined, raw: {channel_id: channelId, earliest: 50, latest: 100}}});
const chunks: PostsInChannelModel[] = [
await transformPostsInChannelRecord({action: OperationType.CREATE, database, value: {record: undefined, raw: {channel_id: channelId, earliest: 25, latest: 70}}}),
await transformPostsInChannelRecord({action: OperationType.CREATE, database, value: {record: undefined, raw: {channel_id: channelId, earliest: 80, latest: 125}}}),
];
const result = await mergePostInChannelChunks(newChunk, chunks);
expect(result.length).toBe(3);
expect(newChunk.earliest).toBe(25);
expect(newChunk.latest).toBe(125);
expect(newChunk._preparedState).toBe('update');
expect(result[0]).toBe(chunks[0]);
expect(chunks[0]._preparedState).toBe('destroyPermanently');
expect(result[1]).toBe(chunks[1]);
expect(chunks[1]._preparedState).toBe('destroyPermanently');
expect(result[2]).toBe(newChunk);
await operator.batchRecords(result, 'test');
});
it('merge with the chunk present', async () => {
const newChunk = await transformPostsInChannelRecord({action: OperationType.CREATE, database, value: {record: undefined, raw: {channel_id: channelId, earliest: 50, latest: 100}}});
const chunks: PostsInChannelModel[] = [
await transformPostsInChannelRecord({action: OperationType.CREATE, database, value: {record: undefined, raw: {channel_id: channelId, earliest: 25, latest: 70}}}),
newChunk,
await transformPostsInChannelRecord({action: OperationType.CREATE, database, value: {record: undefined, raw: {channel_id: channelId, earliest: 80, latest: 125}}}),
];
const result = await mergePostInChannelChunks(newChunk, chunks);
expect(result.length).toBe(3);
expect(newChunk.earliest).toBe(25);
expect(newChunk.latest).toBe(125);
expect(newChunk._preparedState).toBe('update');
expect(result[0]).toBe(chunks[0]);
expect(chunks[0]._preparedState).toBe('destroyPermanently');
expect(result[1]).toBe(chunks[2]);
expect(chunks[2]._preparedState).toBe('destroyPermanently');
expect(result[2]).toBe(newChunk);
await operator.batchRecords(result, 'test');
});
it('do nothing with no intersecting chunks', async () => {
const newChunk = await transformPostsInChannelRecord({action: OperationType.CREATE, database, value: {record: undefined, raw: {channel_id: channelId, earliest: 50, latest: 100}}});
const chunks: PostsInChannelModel[] = [
await transformPostsInChannelRecord({action: OperationType.CREATE, database, value: {record: undefined, raw: {channel_id: channelId, earliest: 25, latest: 40}}}),
newChunk,
await transformPostsInChannelRecord({action: OperationType.CREATE, database, value: {record: undefined, raw: {channel_id: channelId, earliest: 110, latest: 125}}}),
];
const result = await mergePostInChannelChunks(newChunk, chunks);
expect(result.length).toBe(0);
expect(newChunk.earliest).toBe(50);
expect(newChunk.latest).toBe(100);
expect(newChunk._preparedState).toBe('create');
for (const chunk of chunks) {
expect(chunk._preparedState).toBe('create');
}
await operator.batchRecords(result, 'test');
});
});

View File

@@ -15,7 +15,6 @@ import {
} from '@database/operator/server_data_operator/transformers/post';
import {getRawRecordPairs, getUniqueRawsBy, getValidRecordsForUpdate} from '@database/operator/utils/general';
import {createPostsChain, getPostListEdges} from '@database/operator/utils/post';
import {emptyFunction} from '@utils/general';
import {logWarning} from '@utils/log';
import type ServerDataOperatorBase from '.';
@@ -55,6 +54,56 @@ export interface PostHandlerMix {
handleReceivedPostsInThread: (postsMap: Record<string, Post[]>, prepareRecordsOnly?: boolean) => Promise<PostsInThreadModel[]>;
}
const mergePostInChannelChunks = async (newChunk: PostsInChannelModel, existingChunks: PostsInChannelModel[]) => {
let newChunkUpdated = false;
const result: PostsInChannelModel[] = [];
for (const chunk of existingChunks) {
// Exit early if there is no possibility of any other intersection
if (chunk.latest < newChunk.earliest) {
break;
}
// omit the current chunk
if (chunk.id === newChunk.id) {
continue;
}
// delete contained chunks
if (newChunk.earliest <= chunk.earliest && newChunk.latest >= chunk.latest) {
result.push(chunk.prepareDestroyPermanently());
continue;
}
// fuse with any other chunk it intersect with
if (
(newChunk.earliest <= chunk.earliest && newChunk.latest >= chunk.earliest) ||
(newChunk.latest >= chunk.latest && newChunk.earliest >= chunk.earliest)
) {
newChunkUpdated = true;
// We may be updating this record several times, but our patches in the WatermelonDB library
// should handle that gracefully.
newChunk.prepareUpdate((r) => {
r.earliest = Math.min(r.earliest, chunk.earliest);
r.latest = Math.max(r.latest, chunk.latest);
});
result.push(chunk.prepareDestroyPermanently());
}
}
if (newChunkUpdated) {
// We may be adding this record twice in the caller if prepareRecordsOnly is true, but our patches in
// the WatermelonDB library should handle that gracefully.
result.push(newChunk);
}
return result;
};
export const exportedForTest = {
mergePostInChannelChunks,
};
const PostHandler = <TBase extends Constructor<ServerDataOperatorBase>>(superclass: TBase) => class extends superclass {
/**
* handleDraft: Handler responsible for the Create/Update operations occurring the Draft table from the 'Server' schema
@@ -337,7 +386,7 @@ const PostHandler = <TBase extends Constructor<ServerDataOperatorBase>>(supercla
case ActionType.POSTS.RECEIVED_BEFORE:
return this.handleReceivedPostsInChannelBefore(posts, prepareRecordsOnly) as Promise<PostsInChannelModel[]>;
case ActionType.POSTS.RECEIVED_NEW:
return this.handleReceivedPostForChannel(posts, prepareRecordsOnly) as Promise<PostsInChannelModel[]>;
return this.handleReceivedNewPostForChannel(posts, prepareRecordsOnly) as Promise<PostsInChannelModel[]>;
}
return [];
@@ -347,35 +396,22 @@ const PostHandler = <TBase extends Constructor<ServerDataOperatorBase>>(supercla
// POST IN CHANNEL
// ========================
_createPostsInChannelRecord = (channelId: string, earliest: number, latest: number, prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
// We should prepare instead of execute
if (prepareRecordsOnly) {
return this.prepareRecords({
tableName: POSTS_IN_CHANNEL,
createRaws: [{record: undefined, raw: {channel_id: channelId, earliest, latest}}],
transformer: transformPostsInChannelRecord,
});
}
return this.execute({
createRaws: [{record: undefined, raw: {channel_id: channelId, earliest, latest}}],
_createPostsInChannelRecord = async (channelId: string, earliest: number, latest: number, prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
const models = await this.prepareRecords({
tableName: POSTS_IN_CHANNEL,
createRaws: [{record: undefined, raw: {channel_id: channelId, earliest, latest}}],
transformer: transformPostsInChannelRecord,
}, '_createPostsInChannelRecord');
});
if (!prepareRecordsOnly) {
await this.batchRecords(models, '_createPostsInChannelRecord');
}
return models;
};
_mergePostInChannelChunks = async (newChunk: PostsInChannelModel, existingChunks: PostsInChannelModel[], prepareRecordsOnly = false) => {
const result: PostsInChannelModel[] = [];
for (const chunk of existingChunks) {
if (newChunk.earliest <= chunk.earliest && newChunk.latest >= chunk.latest) {
if (!prepareRecordsOnly) {
newChunk.prepareUpdate(emptyFunction);
}
result.push(newChunk);
result.push(chunk.prepareDestroyPermanently());
break;
}
}
const result = await mergePostInChannelChunks(newChunk, existingChunks);
if (result.length && !prepareRecordsOnly) {
await this.batchRecords(result, '_mergePostInChannelChunks');
}
@@ -392,14 +428,8 @@ const PostHandler = <TBase extends Constructor<ServerDataOperatorBase>>(supercla
}
const {firstPost, lastPost} = getPostListEdges(posts);
// Channel Id for this chain of posts
const channelId = firstPost.channel_id;
// Find smallest 'create_at' value in chain
const earliest = firstPost.create_at;
// Find highest 'create_at' value in chain; -1 means we are dealing with one item in the posts array
const latest = lastPost.create_at;
// Find the records in the PostsInChannel table that have a matching channel_id
@@ -409,51 +439,53 @@ const PostHandler = <TBase extends Constructor<ServerDataOperatorBase>>(supercla
).fetch()) as PostsInChannelModel[];
// chunk length 0; then it's a new chunk to be added to the PostsInChannel table
if (chunks.length === 0) {
if (!chunks.length) {
return this._createPostsInChannelRecord(channelId, earliest, latest, prepareRecordsOnly);
}
let targetChunk: PostsInChannelModel|undefined;
for (const chunk of chunks) {
// find if we should plug the chain before
if (firstPost.create_at >= chunk.earliest || latest <= chunk.latest) {
// find if we should plug the chain along an existing chunk
if (
(earliest >= chunk.earliest && earliest <= chunk.latest) ||
(latest <= chunk.latest && latest >= chunk.earliest)
) {
targetChunk = chunk;
break;
}
}
if (targetChunk) {
if (
targetChunk.earliest <= earliest &&
targetChunk.latest >= latest
) {
return [];
}
// If the chunk was found, Update the chunk and return
if (prepareRecordsOnly) {
targetChunk.prepareUpdate((record) => {
record.earliest = Math.min(record.earliest, earliest);
record.latest = Math.max(record.latest, latest);
});
return [targetChunk];
}
targetChunk = await this.database.write(async () => {
return targetChunk!.update((record) => {
record.earliest = Math.min(record.earliest, earliest);
record.latest = Math.max(record.latest, latest);
});
});
return [targetChunk!];
if (!targetChunk) {
// Create a new chunk and merge them if needed
const models = [];
models.push(...await this._createPostsInChannelRecord(channelId, earliest, latest, prepareRecordsOnly));
models.push(...await this._mergePostInChannelChunks(models[0], chunks, prepareRecordsOnly));
return models;
}
// Create a new chunk and merge them if needed
const newChunk = await this._createPostsInChannelRecord(channelId, earliest, latest, prepareRecordsOnly);
const merged = await this._mergePostInChannelChunks(newChunk[0], chunks, prepareRecordsOnly);
return merged;
// Check if the new chunk is contained by the existing chunk
if (
targetChunk.earliest <= earliest &&
targetChunk.latest >= latest
) {
return [];
}
const models = [];
// If the chunk was found, Update the chunk and return
models.push(targetChunk.prepareUpdate((record) => {
record.earliest = Math.min(record.earliest, earliest);
record.latest = Math.max(record.latest, latest);
}));
models.push(...await this._mergePostInChannelChunks(targetChunk, chunks, prepareRecordsOnly));
if (!prepareRecordsOnly) {
this.batchRecords(models, 'handleReceivedPostsInChannel');
}
return models;
};
handleReceivedPostsInChannelSince = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
@@ -464,46 +496,37 @@ const PostHandler = <TBase extends Constructor<ServerDataOperatorBase>>(supercla
return [];
}
const {firstPost} = getPostListEdges(posts);
let latest = 0;
const {firstPost, lastPost} = getPostListEdges(posts);
const channelId = firstPost.channel_id;
const latest = lastPost.create_at;
let recentChunk: PostsInChannelModel|undefined;
const chunks = (await this.database.get(POSTS_IN_CHANNEL).query(
Q.where('channel_id', firstPost.channel_id),
Q.where('channel_id', channelId),
Q.sortBy('latest', Q.desc),
).fetch()) as PostsInChannelModel[];
if (chunks.length) {
recentChunk = chunks[0];
// add any new recent post while skipping the ones that were just updated
for (const post of posts) {
if (post.create_at > recentChunk.latest) {
latest = post.create_at;
}
}
if (!chunks.length) {
// Create a new chunk in case somehow the chunks got deleted for this channel
const earliest = firstPost.create_at;
return this._createPostsInChannelRecord(channelId, earliest, latest, prepareRecordsOnly);
}
if (recentChunk && recentChunk.latest < latest) {
// We've got new posts that belong to this chunk
if (prepareRecordsOnly) {
recentChunk.prepareUpdate((record) => {
record.latest = Math.max(record.latest, latest);
});
const targetChunk = chunks[0];
return [recentChunk];
}
recentChunk = await this.database.write(async () => {
return recentChunk!.update((record) => {
record.latest = Math.max(record.latest, latest);
});
});
return [recentChunk!];
if (targetChunk.latest >= latest) {
return [];
}
return [];
// We've got new posts that belong to this chunk
const models = [targetChunk.prepareUpdate((record) => {
record.latest = Math.max(record.latest, latest);
})];
if (!prepareRecordsOnly) {
this.batchRecords(models, 'handleReceivedPostsInChannelSince');
}
return models;
};
handleReceivedPostsInChannelBefore = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
@@ -514,12 +537,8 @@ const PostHandler = <TBase extends Constructor<ServerDataOperatorBase>>(supercla
return [];
}
const {firstPost} = getPostListEdges(posts);
// Channel Id for this chain of posts
const {firstPost, lastPost} = getPostListEdges(posts);
const channelId = firstPost.channel_id;
// Find smallest 'create_at' value in chain
const earliest = firstPost.create_at;
// Find the records in the PostsInChannel table that have a matching channel_id
@@ -528,42 +547,38 @@ const PostHandler = <TBase extends Constructor<ServerDataOperatorBase>>(supercla
Q.sortBy('latest', Q.desc),
).fetch()) as PostsInChannelModel[];
if (chunks.length === 0) {
// No chunks found, previous posts in this block not found
if (!chunks.length) {
// Create a new chunk in case somehow the chunks got deleted for this channel
const latest = lastPost.create_at;
return this._createPostsInChannelRecord(channelId, earliest, latest, prepareRecordsOnly);
}
const targetChunk = chunks[0];
if (targetChunk.earliest <= earliest) {
return [];
}
let targetChunk = chunks[0];
if (targetChunk) {
if (targetChunk.earliest <= earliest) {
return [];
}
const models = [];
// If the chunk was found, Update the chunk and return
if (prepareRecordsOnly) {
targetChunk.prepareUpdate((record) => {
record.earliest = Math.min(record.earliest, earliest);
});
return [targetChunk];
}
// If the chunk was found, Update the chunk and return
models.push(targetChunk.prepareUpdate((record) => {
record.earliest = Math.min(record.earliest, earliest);
}));
targetChunk = await this.database.write(async () => {
return targetChunk!.update((record) => {
record.earliest = Math.min(record.earliest, earliest);
});
});
models.push(...(await this._mergePostInChannelChunks(targetChunk, chunks, prepareRecordsOnly)));
return [targetChunk!];
if (!prepareRecordsOnly) {
this.batchRecords(models, 'handleReceivedPostsInChannelBefore');
}
return targetChunk;
return models;
};
handleReceivedPostsInChannelAfter = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
throw new Error(`handleReceivedPostsInChannelAfter Not implemented yet. posts count${posts.length} prepareRecordsOnly=${prepareRecordsOnly}`);
};
handleReceivedPostForChannel = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
handleReceivedNewPostForChannel = async (posts: Post[], prepareRecordsOnly = false): Promise<PostsInChannelModel[]> => {
if (!posts?.length) {
logWarning(
'An empty or undefined "posts" array has been passed to the handleReceivedPostForChannel method',
@@ -572,14 +587,8 @@ const PostHandler = <TBase extends Constructor<ServerDataOperatorBase>>(supercla
}
const {firstPost, lastPost} = getPostListEdges(posts);
// Channel Id for this chain of posts
const channelId = firstPost.channel_id;
// Find smallest 'create_at' value in chain
const earliest = firstPost.create_at;
// Find highest 'create_at' value in chain; -1 means we are dealing with one item in the posts array
const latest = lastPost.create_at;
// Find the records in the PostsInChannel table that have a matching channel_id
@@ -593,30 +602,20 @@ const PostHandler = <TBase extends Constructor<ServerDataOperatorBase>>(supercla
return this._createPostsInChannelRecord(channelId, earliest, latest, prepareRecordsOnly);
}
let targetChunk = chunks[0];
if (targetChunk) {
if (targetChunk.latest >= latest) {
return [];
}
// If the chunk was found, Update the chunk and return
if (prepareRecordsOnly) {
targetChunk.prepareUpdate((record) => {
record.latest = Math.max(record.latest, latest);
});
return [targetChunk];
}
targetChunk = await this.database.write(async () => {
return targetChunk!.update((record) => {
record.latest = Math.max(record.latest, latest);
});
});
return [targetChunk!];
const targetChunk = chunks[0];
if (targetChunk.latest >= latest) {
return [];
}
return targetChunk;
// If the chunk was found, Update the chunk and return
targetChunk.prepareUpdate((record) => {
record.latest = Math.max(record.latest, latest);
});
if (!prepareRecordsOnly) {
this.batchRecords([targetChunk], 'handleReceivedNewPostForChannel');
}
return [targetChunk];
};
// ========================