MM-40093 [Gekidou]: adds remote action to sync threads (#6135)

* MM-40093: adds remote action to sync threads

Syncing threads fetches all unread threads if there are no threads in
the DB.
If there are threads in the DB it fetches all threads newest than the
newest one we have stored.

Fetching happens in batches of 60 threads at a time.

* Fixes querying for all threads list

* Syncs threads on WS reconnection

* Addresses review comments

* Addresses review comments
This commit is contained in:
Kyriakos Z
2022-04-12 15:03:05 +03:00
committed by GitHub
parent df3ef72a0a
commit 477c7cf1bf
6 changed files with 226 additions and 22 deletions

View File

@@ -18,6 +18,9 @@ import {queryAllChannelsForTeam} from '@queries/servers/channel';
import {getConfig} from '@queries/servers/system';
import {deleteMyTeams, getAvailableTeamIds, queryMyTeams, queryMyTeamsByIds, queryTeamsById} from '@queries/servers/team';
import {isDMorGM} from '@utils/channel';
import {isCRTEnabled} from '@utils/thread';
import {fetchNewThreads} from '../thread';
import type ClientError from '@client/rest/error';
@@ -209,6 +212,21 @@ export async function deferredAppEntryActions(
await fetchTeamsChannelsAndUnreadPosts(serverUrl, since, teamData.teams, teamData.memberships, initialTeamId);
}
if (preferences && isCRTEnabled(preferences, config)) {
if (initialTeamId) {
await fetchNewThreads(serverUrl, initialTeamId, false);
}
if (teamData.teams?.length) {
for await (const team of teamData.teams) {
if (team.id !== initialTeamId) {
// need to await here since GM/DM threads in different teams overlap
await fetchNewThreads(serverUrl, team.id, false);
}
}
}
}
fetchAllTeams(serverUrl);
updateAllUsersSince(serverUrl, since);
}

View File

@@ -9,10 +9,14 @@ import NetworkManager from '@init/network_manager';
import {getChannelById} from '@queries/servers/channel';
import {getPostById} from '@queries/servers/post';
import {getCommonSystemValues} from '@queries/servers/system';
import {getIsCRTEnabled, getThreadById} from '@queries/servers/thread';
import {getIsCRTEnabled, getNewestThreadInTeam, getThreadById} from '@queries/servers/thread';
import {getCurrentUser} from '@queries/servers/user';
import {forceLogoutIfNecessary} from './session';
import type {Client} from '@client/rest';
import type {Model} from '@nozbe/watermelondb';
type FetchThreadsRequest = {
error?: unknown;
} | {
@@ -26,7 +30,8 @@ type FetchThreadsOptions = {
deleted?: boolean;
unread?: boolean;
since?: number;
}
totalsOnly?: boolean;
};
export const fetchAndSwitchToThread = async (serverUrl: string, rootId: string) => {
const database = DatabaseManager.serverDatabases[serverUrl]?.database;
@@ -67,7 +72,7 @@ export const fetchThreads = async (
perPage = General.CRT_CHUNK_SIZE,
deleted = false,
unread = false,
since = 0,
since,
}: FetchThreadsOptions = {
perPage: General.CRT_CHUNK_SIZE,
deleted: false,
@@ -90,7 +95,7 @@ export const fetchThreads = async (
try {
const {config} = await getCommonSystemValues(database);
const data = await client.getThreads('me', teamId, before, after, perPage, deleted, unread, since, config.Version);
const data = await client.getThreads('me', teamId, before, after, perPage, deleted, unread, since, false, config.Version);
const {threads} = data;
@@ -194,3 +199,151 @@ export const updateThreadFollowing = async (serverUrl: string, teamId: string, t
return {error};
}
};
enum Direction {
Up,
Down,
}
async function fetchBatchThreads(
serverUrl: string,
teamId: string,
options: FetchThreadsOptions,
pages?: number,
): Promise<{error: unknown; data?: Thread[]}> {
const operator = DatabaseManager.serverDatabases[serverUrl]?.operator;
if (!operator) {
return {error: `${serverUrl} database not found`};
}
let client: Client;
try {
client = NetworkManager.getClient(serverUrl);
} catch (error) {
return {error};
}
// if we start from the begging of time (since = 0) we need to fetch threads from newest to oldest (Direction.Down)
// if there is another point in time, we need to fetch threads from oldest to newest (Direction.Up)
let direction = Direction.Up;
if (options.since === 0) {
direction = Direction.Down;
}
const currentUser = await getCurrentUser(operator.database);
if (!currentUser) {
return {error: 'currentUser not found'};
}
const {config} = await getCommonSystemValues(operator.database);
const data: Thread[] = [];
const fetchThreadsFunc = async (opts: FetchThreadsOptions) => {
let page = 0;
const {before, after, perPage = General.CRT_CHUNK_SIZE, deleted, unread, since} = opts;
page += 1;
const {threads} = await client.getThreads(currentUser.id, teamId, before, after, perPage, deleted, unread, since, false, config.Version);
if (threads.length) {
// Mark all fetched threads as following
for (const thread of threads) {
thread.is_following = true;
}
data.push(...threads);
if (threads.length === perPage) {
const newOptions: FetchThreadsOptions = {perPage, deleted, unread};
if (direction === Direction.Down) {
const last = threads[threads.length - 1];
newOptions.before = last.id;
} else {
const first = threads[0];
newOptions.after = first.id;
}
if (pages != null && page < pages) {
fetchThreadsFunc(newOptions);
}
}
}
};
try {
await fetchThreadsFunc(options);
} catch (error) {
if (__DEV__) {
throw error;
}
return {error, data};
}
return {error: false, data};
}
export async function fetchNewThreads(
serverUrl: string,
teamId: string,
prepareRecordsOnly = false,
): Promise<{error: unknown; models?: Model[]}> {
const options: FetchThreadsOptions = {
unread: false,
deleted: true,
perPage: 60,
};
const operator = DatabaseManager.serverDatabases[serverUrl]?.operator;
if (!operator) {
return {error: `${serverUrl} database not found`};
}
const newestThread = await getNewestThreadInTeam(operator.database, teamId, false);
options.since = newestThread ? newestThread.lastReplyAt : 0;
let response = {
error: undefined,
data: [],
} as {
error: unknown;
data?: Thread[];
};
let loadedInGlobalThreads = true;
// if we have no threads in the DB fetch all unread ones
if (options.since === 0) {
// options to fetch all unread threads
options.deleted = false;
options.unread = true;
loadedInGlobalThreads = false;
}
response = await fetchBatchThreads(serverUrl, teamId, options);
const {error: nErr, data} = response;
if (nErr) {
return {error: nErr};
}
if (!data?.length) {
return {error: false, models: []};
}
const {error, models} = await processReceivedThreads(serverUrl, data, teamId, true, loadedInGlobalThreads);
if (!error && !prepareRecordsOnly && models?.length) {
try {
await operator.batchRecords(models);
} catch (err) {
if (__DEV__) {
throw err;
}
return {error: true};
}
}
return {error: false, models};
}

View File

@@ -7,6 +7,7 @@ import {fetchPostsForUnreadChannels, fetchPostsSince} from '@actions/remote/post
import {fetchRoles} from '@actions/remote/role';
import {fetchConfigAndLicense} from '@actions/remote/systems';
import {fetchAllTeams, fetchTeamsChannelsAndUnreadPosts} from '@actions/remote/team';
import {fetchNewThreads} from '@actions/remote/thread';
import {fetchStatusByIds, updateAllUsersSince} from '@actions/remote/user';
import {WebsocketEvents} from '@constants';
import {SYSTEM_IDENTIFIERS} from '@constants/database';
@@ -17,6 +18,7 @@ import {prepareModels} from '@queries/servers/entry';
import {getCommonSystemValues, getConfig, getCurrentChannelId, getWebSocketLastDisconnected, resetWebSocketLastDisconnected, setCurrentTeamAndChannelId} from '@queries/servers/system';
import {isDMorGM} from '@utils/channel';
import {isTablet} from '@utils/helpers';
import {isCRTEnabled} from '@utils/thread';
import {handleCategoryCreatedEvent, handleCategoryDeletedEvent, handleCategoryOrderUpdatedEvent, handleCategoryUpdatedEvent} from './category';
import {handleChannelConvertedEvent, handleChannelCreatedEvent,
@@ -197,6 +199,21 @@ async function doReconnect(serverUrl: string) {
await fetchTeamsChannelsAndUnreadPosts(serverUrl, lastDisconnectedAt, teamData.teams, teamData.memberships, initialTeamId);
}
if (prefData.preferences && isCRTEnabled(prefData.preferences, config)) {
if (initialTeamId) {
await fetchNewThreads(serverUrl, initialTeamId, false);
}
if (teamData.teams?.length) {
for await (const team of teamData.teams) {
if (team.id !== initialTeamId) {
// need to await here since GM/DM threads in different teams overlap
await fetchNewThreads(serverUrl, team.id, false);
}
}
}
}
fetchAllTeams(serverUrl);
updateAllUsersSince(serverUrl, lastDisconnectedAt);

View File

@@ -6,7 +6,7 @@ import {buildQueryString, isMinimumServerVersion} from '@utils/helpers';
import {PER_PAGE_DEFAULT} from './constants';
export interface ClientThreadsMix {
getThreads: (userId: string, teamId: string, before?: string, after?: string, pageSize?: number, deleted?: boolean, unread?: boolean, since?: number, serverVersion?: string) => Promise<GetUserThreadsResponse>;
getThreads: (userId: string, teamId: string, before?: string, after?: string, pageSize?: number, deleted?: boolean, unread?: boolean, since?: number, totalsOnly?: boolean, serverVersion?: string) => Promise<GetUserThreadsResponse>;
getThread: (userId: string, teamId: string, threadId: string, extended?: boolean) => Promise<any>;
updateTeamThreadsAsRead: (userId: string, teamId: string) => Promise<any>;
updateThreadRead: (userId: string, teamId: string, threadId: string, timestamp: number) => Promise<any>;
@@ -14,7 +14,7 @@ export interface ClientThreadsMix {
}
const ClientThreads = (superclass: any) => class extends superclass {
getThreads = async (userId: string, teamId: string, before = '', after = '', pageSize = PER_PAGE_DEFAULT, deleted = false, unread = false, since = 0, serverVersion = '') => {
getThreads = async (userId: string, teamId: string, before = '', after = '', pageSize = PER_PAGE_DEFAULT, deleted = false, unread = false, since = 0, totalsOnly = false, serverVersion = '') => {
const queryStringObj: Record<string, any> = {
extended: 'true',
before,
@@ -22,6 +22,7 @@ const ClientThreads = (superclass: any) => class extends superclass {
deleted,
unread,
since,
totalsOnly,
};
if (serverVersion && isMinimumServerVersion(serverVersion, 6, 0)) {
queryStringObj.per_page = pageSize;

View File

@@ -16,7 +16,7 @@ import type ServerDataOperator from '@database/operator/server_data_operator';
import type Model from '@nozbe/watermelondb/Model';
import type ThreadModel from '@typings/database/models/servers/thread';
const {SERVER: {CHANNEL, POST, THREAD}} = MM_TABLES;
const {SERVER: {THREADS_IN_TEAM, THREAD, POST, CHANNEL}} = MM_TABLES;
export const getIsCRTEnabled = async (database: Database): Promise<boolean> => {
const config = await getConfig(database);
@@ -96,10 +96,8 @@ export const prepareThreadsFromReceivedPosts = async (operator: ServerDataOperat
return models;
};
export const queryThreadsInTeam = (database: Database, teamId: string, onlyUnreads?: boolean, hasReplies?: boolean, isFollowing?: boolean, sort?: boolean): Query<ThreadModel> => {
const query: Q.Clause[] = [
Q.experimentalNestedJoin(POST, CHANNEL),
];
export const queryThreadsInTeam = (database: Database, teamId: string, onlyUnreads?: boolean, hasReplies?: boolean, isFollowing?: boolean, sort?: boolean, limit?: number): Query<ThreadModel> => {
const query: Q.Clause[] = [];
if (isFollowing) {
query.push(Q.where('is_following', true));
@@ -117,22 +115,39 @@ export const queryThreadsInTeam = (database: Database, teamId: string, onlyUnrea
query.push(Q.sortBy('last_reply_at', Q.desc));
}
let joinCondition: Q.Condition = Q.where('team_id', teamId);
if (!onlyUnreads) {
joinCondition = Q.and(
Q.where('team_id', teamId),
Q.where('loaded_in_global_threads', true),
);
}
query.push(
Q.on(
POST,
Q.on(
CHANNEL,
Q.or(
Q.where('team_id', teamId),
Q.where('team_id', ''),
),
),
),
Q.on(THREADS_IN_TEAM, joinCondition),
);
if (limit) {
query.push(Q.take(limit));
}
return database.get<ThreadModel>(THREAD).query(...query);
};
export async function getNewestThreadInTeam(
database: Database,
teamId: string,
unread: boolean,
): Promise<ThreadModel | undefined> {
try {
const threads = await queryThreadsInTeam(database, teamId, unread, true, true, true, 1).fetch();
return threads?.[0] || undefined;
} catch (e) {
return undefined;
}
}
export function observeThreadMentionCount(database: Database, teamId?: string, includeDmGm?: boolean): Observable<number> {
return observeUnreadsAndMentionsInTeam(database, teamId, includeDmGm).pipe(
switchMap(({mentions}) => of$(mentions)),

View File

@@ -6,7 +6,7 @@ import {getPreferenceValue} from '@helpers/api/preference';
import type PreferenceModel from '@typings/database/models/servers/preference';
export function isCRTEnabled(preferences: PreferenceModel[], config?: ClientConfig): boolean {
export function isCRTEnabled(preferences: PreferenceModel[]|PreferenceType[], config?: ClientConfig): boolean {
let preferenceDefault = Preferences.COLLAPSED_REPLY_THREADS_OFF;
const configValue = config?.CollapsedThreads;
if (configValue === Config.DEFAULT_ON) {