forked from Ivasoft/mattermost-mobile
Add column last_fetched_at to MyChannel & Thread tables and the migration (#6433)
* Add column last_fetched_at to MyChannel & Thread tables and the migration * Fix schema tests * Handle lastFetchAt, retrieve threads on init and properly observe thread unreads (#6436) * [Gekidou] Set lastFetchAt when fetching posts for a channel (#6437) * Set lastFetchAt when fetching posts for a channel * When resetting _preparedState set always to null * Revert changes in WS * Handle and set lastFetchedAt for MyChannel in iOS push notification * feedback review * iOS fallback to last post createAt if no lastFetchAt set * Handle lastFetchAt on Android push notifications * create storePostsForChannel local action * Fix iOS fallback to last post create_at Co-authored-by: Daniel Espino García <larkox@gmail.com>
This commit is contained in:
@@ -120,7 +120,7 @@ class DatabaseHelper {
|
||||
return result.getString("value")
|
||||
}
|
||||
|
||||
fun queryPostSinceForChannel(db: Database?, channelId: String): Double? {
|
||||
fun queryLastPostCreateAt(db: Database?, channelId: String): Double? {
|
||||
if (db != null) {
|
||||
val postsInChannelQuery = "SELECT earliest, latest FROM PostsInChannel WHERE channel_id=? ORDER BY latest DESC LIMIT 1"
|
||||
val cursor1 = db.rawQuery(postsInChannelQuery, arrayOf(channelId))
|
||||
@@ -142,6 +142,23 @@ class DatabaseHelper {
|
||||
return null
|
||||
}
|
||||
|
||||
fun queryPostSinceForChannel(db: Database?, channelId: String): Double? {
|
||||
if (db != null) {
|
||||
val postsInChannelQuery = "SELECT last_fetched_at FROM MyChannel WHERE id=? LIMIT 1"
|
||||
val cursor1 = db.rawQuery(postsInChannelQuery, arrayOf(channelId))
|
||||
if (cursor1.count == 1) {
|
||||
cursor1.moveToFirst()
|
||||
val lastFetchedAt = cursor1.getDouble(0)
|
||||
cursor1.close()
|
||||
if (lastFetchedAt == 0) {
|
||||
return queryLastPostCreateAt(db, channelId)
|
||||
}
|
||||
return lastFetchedAt
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
fun handlePosts(db: Database, postsData: ReadableMap?, channelId: String, receivingThreads: Boolean) {
|
||||
// Posts, PostInChannel, PostInThread, Reactions, Files, CustomEmojis, Users
|
||||
if (postsData != null) {
|
||||
@@ -149,15 +166,26 @@ class DatabaseHelper {
|
||||
val posts = ReadableMapUtils.toJSONObject(postsData.getMap("posts")).toMap()
|
||||
val previousPostId = postsData.getString("prev_post_id")
|
||||
val postsInThread = hashMapOf<String, List<JSONObject>>()
|
||||
val postList = posts.toList()
|
||||
var earliest = 0.0
|
||||
var latest = 0.0
|
||||
var lastFetchedAt = 0.0
|
||||
|
||||
if (ordered != null && posts.isNotEmpty()) {
|
||||
val firstId = ordered.first()
|
||||
val lastId = ordered.last()
|
||||
lastFetchedAt = postList.fold(0.0) { acc, next ->
|
||||
val post = next.second as Map<String, Any?>
|
||||
val createAt = post["create_at"] as Double
|
||||
val updateAt = post["update_at"] as Double
|
||||
val deleteAt = post["delete_at"] as Double
|
||||
val value = maxOf(createAt, updateAt, deleteAt)
|
||||
|
||||
maxOf(value, acc)
|
||||
}
|
||||
var prevPostId = ""
|
||||
|
||||
val sortedPosts = posts.toList().sortedBy { (_, value) ->
|
||||
val sortedPosts = postList.sortedBy { (_, value) ->
|
||||
((value as Map<*, *>).get("create_at") as Double)
|
||||
}
|
||||
|
||||
@@ -207,6 +235,7 @@ class DatabaseHelper {
|
||||
|
||||
if (!receivingThreads) {
|
||||
handlePostsInChannel(db, channelId, earliest, latest)
|
||||
updateMyChannelLastFetchedAt(db, channelId, lastFetchedAt)
|
||||
}
|
||||
handlePostsInThread(db, postsInThread)
|
||||
}
|
||||
@@ -419,8 +448,8 @@ class DatabaseHelper {
|
||||
|
||||
db.execute(
|
||||
"insert into Thread " +
|
||||
"(id, last_reply_at, last_viewed_at, reply_count, is_following, unread_replies, unread_mentions, _status)" +
|
||||
" values (?, ?, ?, ?, ?, ?, ?, 'created')",
|
||||
"(id, last_reply_at, last_fetched_at, last_viewed_at, reply_count, is_following, unread_replies, unread_mentions, _status)" +
|
||||
" values (?, ?, 0, ?, ?, ?, ?, ?, 'created')",
|
||||
arrayOf(
|
||||
thread.getString("id"),
|
||||
thread.getDouble("last_reply_at") ?: 0,
|
||||
@@ -561,6 +590,16 @@ class DatabaseHelper {
|
||||
}
|
||||
}
|
||||
|
||||
private fun updateMyChannelLastFetchedAt(db: Database, channelId: String, lastFetchedAt: Double) {
|
||||
db.execute(
|
||||
"UPDATE MyChannel SET last_fetched_at = ?, _status = 'updated' WHERE id = ?",
|
||||
arrayOf(
|
||||
lastFetchedAt,
|
||||
channelId
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
private fun findPostInChannel(chunks: ReadableArray, earliest: Double, latest: Double): ReadableMap? {
|
||||
for (i in 0 until chunks.size()) {
|
||||
val chunk = chunks.getMap(i)
|
||||
|
||||
@@ -303,21 +303,22 @@ export async function updateChannelInfoFromChannel(serverUrl: string, channel: C
|
||||
export async function updateLastPostAt(serverUrl: string, channelId: string, lastPostAt: number, prepareRecordsOnly = false) {
|
||||
try {
|
||||
const {database, operator} = DatabaseManager.getServerDatabaseAndOperator(serverUrl);
|
||||
const member = await getMyChannel(database, channelId);
|
||||
if (!member) {
|
||||
const myChannel = await getMyChannel(database, channelId);
|
||||
if (!myChannel) {
|
||||
return {error: 'not a member'};
|
||||
}
|
||||
|
||||
if (lastPostAt > member.lastPostAt) {
|
||||
member.prepareUpdate((m) => {
|
||||
if (lastPostAt > myChannel.lastPostAt) {
|
||||
myChannel.resetPreparedState();
|
||||
myChannel.prepareUpdate((m) => {
|
||||
m.lastPostAt = lastPostAt;
|
||||
});
|
||||
|
||||
if (!prepareRecordsOnly) {
|
||||
await operator.batchRecords([member]);
|
||||
await operator.batchRecords([myChannel]);
|
||||
}
|
||||
|
||||
return {member};
|
||||
return {member: myChannel};
|
||||
}
|
||||
|
||||
return {member: undefined};
|
||||
@@ -327,6 +328,34 @@ export async function updateLastPostAt(serverUrl: string, channelId: string, las
|
||||
}
|
||||
}
|
||||
|
||||
export async function updateMyChannelLastFetchedAt(serverUrl: string, channelId: string, lastFetchedAt: number, prepareRecordsOnly = false) {
|
||||
try {
|
||||
const {database, operator} = DatabaseManager.getServerDatabaseAndOperator(serverUrl);
|
||||
const myChannel = await getMyChannel(database, channelId);
|
||||
if (!myChannel) {
|
||||
return {error: 'not a member'};
|
||||
}
|
||||
|
||||
if (lastFetchedAt > myChannel.lastFetchedAt) {
|
||||
myChannel.resetPreparedState();
|
||||
myChannel.prepareUpdate((m) => {
|
||||
m.lastFetchedAt = lastFetchedAt;
|
||||
});
|
||||
|
||||
if (!prepareRecordsOnly) {
|
||||
await operator.batchRecords([myChannel]);
|
||||
}
|
||||
|
||||
return {member: myChannel};
|
||||
}
|
||||
|
||||
return {member: undefined};
|
||||
} catch (error) {
|
||||
logError('Failed updateLastFetchedAt', error);
|
||||
return {error};
|
||||
}
|
||||
}
|
||||
|
||||
type User = UserProfile | UserModel;
|
||||
export async function updateChannelsDisplayName(serverUrl: string, channels: ChannelModel[], users: User[], prepareRecordsOnly = false) {
|
||||
try {
|
||||
|
||||
@@ -5,10 +5,15 @@ import {ActionType, Post} from '@constants';
|
||||
import DatabaseManager from '@database/manager';
|
||||
import {getPostById, prepareDeletePost} from '@queries/servers/post';
|
||||
import {getCurrentUserId} from '@queries/servers/system';
|
||||
import {getIsCRTEnabled, prepareThreadsFromReceivedPosts} from '@queries/servers/thread';
|
||||
import {generateId} from '@utils/general';
|
||||
import {logError} from '@utils/log';
|
||||
import {getLastFetchedAtFromPosts} from '@utils/post';
|
||||
import {getPostIdsForCombinedUserActivityPost} from '@utils/post_list';
|
||||
|
||||
import {updateLastPostAt, updateMyChannelLastFetchedAt} from './channel';
|
||||
|
||||
import type MyChannelModel from '@typings/database/models/servers/my_channel';
|
||||
import type PostModel from '@typings/database/models/servers/post';
|
||||
import type UserModel from '@typings/database/models/servers/user';
|
||||
|
||||
@@ -160,3 +165,71 @@ export async function markPostAsDeleted(serverUrl: string, post: Post, prepareRe
|
||||
return {error};
|
||||
}
|
||||
}
|
||||
|
||||
export async function storePostsForChannel(
|
||||
serverUrl: string, channelId: string, posts: Post[], order: string[], previousPostId: string,
|
||||
actionType: string, authors: UserProfile[], prepareRecordsOnly = false,
|
||||
) {
|
||||
try {
|
||||
const {database, operator} = DatabaseManager.getServerDatabaseAndOperator(serverUrl);
|
||||
|
||||
const isCRTEnabled = await getIsCRTEnabled(database);
|
||||
|
||||
const models = [];
|
||||
const postModels = await operator.handlePosts({
|
||||
actionType,
|
||||
order,
|
||||
posts,
|
||||
previousPostId,
|
||||
prepareRecordsOnly: true,
|
||||
});
|
||||
models.push(...postModels);
|
||||
|
||||
if (authors.length) {
|
||||
const userModels = await operator.handleUsers({users: authors, prepareRecordsOnly: true});
|
||||
models.push(...userModels);
|
||||
}
|
||||
|
||||
const lastFetchedAt = getLastFetchedAtFromPosts(posts);
|
||||
let myChannelModel: MyChannelModel | undefined;
|
||||
if (lastFetchedAt) {
|
||||
const {member} = await updateMyChannelLastFetchedAt(serverUrl, channelId, lastFetchedAt, true);
|
||||
myChannelModel = member;
|
||||
}
|
||||
|
||||
let lastPostAt = 0;
|
||||
for (const post of posts) {
|
||||
const isCrtReply = isCRTEnabled && post.root_id !== '';
|
||||
if (!isCrtReply) {
|
||||
lastPostAt = post.create_at > lastPostAt ? post.create_at : lastPostAt;
|
||||
}
|
||||
}
|
||||
|
||||
if (lastPostAt) {
|
||||
const {member} = await updateLastPostAt(serverUrl, channelId, lastPostAt, true);
|
||||
if (member) {
|
||||
myChannelModel = member;
|
||||
}
|
||||
}
|
||||
|
||||
if (myChannelModel) {
|
||||
models.push(myChannelModel);
|
||||
}
|
||||
|
||||
if (isCRTEnabled) {
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, posts, false);
|
||||
if (threadModels?.length) {
|
||||
models.push(...threadModels);
|
||||
}
|
||||
}
|
||||
|
||||
if (models.length && !prepareRecordsOnly) {
|
||||
await operator.batchRecords(models);
|
||||
}
|
||||
|
||||
return {models};
|
||||
} catch (error) {
|
||||
logError('storePostsForChannel', error);
|
||||
return {error};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ export async function createThreadFromNewPost(serverUrl: string, post: Post, pre
|
||||
});
|
||||
models.push(...threadParticipantModels);
|
||||
} else { // If the post is a root post, then we need to add it to the thread table
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, [post]);
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, [post], false);
|
||||
models.push(...threadModels);
|
||||
}
|
||||
|
||||
@@ -211,6 +211,7 @@ export async function processReceivedThreads(serverUrl: string, threads: Thread[
|
||||
|
||||
const posts: Post[] = [];
|
||||
const users: UserProfile[] = [];
|
||||
const threadsToHandle: ThreadWithLastFetchedAt[] = [];
|
||||
|
||||
// Extract posts & users from the received threads
|
||||
for (let i = 0; i < threads.length; i++) {
|
||||
@@ -221,6 +222,7 @@ export async function processReceivedThreads(serverUrl: string, threads: Thread[
|
||||
users.push(participant);
|
||||
}
|
||||
});
|
||||
threadsToHandle.push({...threads[i], lastFetchedAt: post.create_at});
|
||||
}
|
||||
|
||||
const postModels = await operator.handlePosts({
|
||||
@@ -231,7 +233,7 @@ export async function processReceivedThreads(serverUrl: string, threads: Thread[
|
||||
});
|
||||
|
||||
const threadModels = await operator.handleThreads({
|
||||
threads,
|
||||
threads: threadsToHandle,
|
||||
teamId,
|
||||
prepareRecordsOnly: true,
|
||||
loadedInGlobalThreads,
|
||||
|
||||
@@ -324,13 +324,13 @@ export const syncOtherServers = async (serverUrl: string) => {
|
||||
for (const server of servers) {
|
||||
if (server.url !== serverUrl && server.lastActiveAt > 0) {
|
||||
registerDeviceToken(server.url);
|
||||
syncAllChannelMembers(server.url);
|
||||
syncAllChannelMembersAndThreads(server.url);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const syncAllChannelMembers = async (serverUrl: string) => {
|
||||
const syncAllChannelMembersAndThreads = async (serverUrl: string) => {
|
||||
const database = DatabaseManager.serverDatabases[serverUrl]?.database;
|
||||
if (!database) {
|
||||
return;
|
||||
@@ -345,10 +345,16 @@ const syncAllChannelMembers = async (serverUrl: string) => {
|
||||
|
||||
try {
|
||||
const myTeams = await client.getMyTeams();
|
||||
const preferences = await client.getMyPreferences();
|
||||
const config = await client.getClientConfigOld();
|
||||
|
||||
let excludeDirect = false;
|
||||
for (const myTeam of myTeams) {
|
||||
fetchMyChannelsForTeam(serverUrl, myTeam.id, false, 0, false, excludeDirect);
|
||||
excludeDirect = true;
|
||||
if (preferences && processIsCRTEnabled(preferences, config)) {
|
||||
fetchNewThreads(serverUrl, myTeam.id, false);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Do nothing
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
import {DeviceEventEmitter} from 'react-native';
|
||||
|
||||
import {markChannelAsUnread, updateLastPostAt} from '@actions/local/channel';
|
||||
import {removePost} from '@actions/local/post';
|
||||
import {removePost, storePostsForChannel} from '@actions/local/post';
|
||||
import {addRecentReaction} from '@actions/local/reactions';
|
||||
import {createThreadFromNewPost} from '@actions/local/thread';
|
||||
import {ActionType, Events, General, Post, ServerErrors} from '@constants';
|
||||
@@ -17,7 +17,7 @@ import {filterPostsInOrderedArray} from '@helpers/api/post';
|
||||
import {getNeededAtMentionedUsernames} from '@helpers/api/user';
|
||||
import {extractRecordsForTable} from '@helpers/database';
|
||||
import NetworkManager from '@managers/network_manager';
|
||||
import {prepareMissingChannelsForAllTeams, queryAllMyChannel} from '@queries/servers/channel';
|
||||
import {getMyChannel, prepareMissingChannelsForAllTeams, queryAllMyChannel} from '@queries/servers/channel';
|
||||
import {queryAllCustomEmojis} from '@queries/servers/custom_emoji';
|
||||
import {getPostById, getRecentPostsInChannel} from '@queries/servers/post';
|
||||
import {getCurrentUserId, getCurrentChannelId} from '@queries/servers/system';
|
||||
@@ -280,84 +280,45 @@ export const fetchPostsForCurrentChannel = async (serverUrl: string) => {
|
||||
};
|
||||
|
||||
export async function fetchPostsForChannel(serverUrl: string, channelId: string, fetchOnly = false) {
|
||||
const operator = DatabaseManager.serverDatabases[serverUrl]?.operator;
|
||||
if (!operator) {
|
||||
return {error: `${serverUrl} database not found`};
|
||||
}
|
||||
try {
|
||||
const {database} = DatabaseManager.getServerDatabaseAndOperator(serverUrl);
|
||||
let postAction: Promise<PostsRequest>|undefined;
|
||||
let actionType: string|undefined;
|
||||
const myChannel = await getMyChannel(database, channelId);
|
||||
const postsInChannel = await getRecentPostsInChannel(database, channelId);
|
||||
const since = myChannel?.lastFetchedAt || postsInChannel?.[0]?.createAt || 0;
|
||||
if (since) {
|
||||
postAction = fetchPostsSince(serverUrl, channelId, since, true);
|
||||
actionType = ActionType.POSTS.RECEIVED_SINCE;
|
||||
} else {
|
||||
postAction = fetchPosts(serverUrl, channelId, 0, General.POST_CHUNK_SIZE, true);
|
||||
actionType = ActionType.POSTS.RECEIVED_IN_CHANNEL;
|
||||
}
|
||||
|
||||
let postAction: Promise<PostsRequest>|undefined;
|
||||
let actionType: string|undefined;
|
||||
const postsInChannel = await getRecentPostsInChannel(operator.database, channelId);
|
||||
if (!postsInChannel || postsInChannel.length < General.POST_CHUNK_SIZE) {
|
||||
postAction = fetchPosts(serverUrl, channelId, 0, General.POST_CHUNK_SIZE, true);
|
||||
actionType = ActionType.POSTS.RECEIVED_IN_CHANNEL;
|
||||
} else {
|
||||
const since = postsInChannel[0]?.createAt || 0;
|
||||
postAction = fetchPostsSince(serverUrl, channelId, since, true);
|
||||
actionType = ActionType.POSTS.RECEIVED_SINCE;
|
||||
}
|
||||
const data = await postAction;
|
||||
if (data.error) {
|
||||
throw data.error;
|
||||
}
|
||||
|
||||
const data = await postAction;
|
||||
if (data.error) {
|
||||
// Here we should emit an event that fetching posts failed.
|
||||
}
|
||||
|
||||
let authors: UserProfile[] = [];
|
||||
if (data.posts?.length && data.order?.length) {
|
||||
try {
|
||||
let authors: UserProfile[] = [];
|
||||
if (data.posts?.length && data.order?.length) {
|
||||
const {authors: fetchedAuthors} = await fetchPostAuthors(serverUrl, data.posts, true);
|
||||
authors = fetchedAuthors || [];
|
||||
} catch (error) {
|
||||
logError('FETCH AUTHORS ERROR', error);
|
||||
}
|
||||
|
||||
if (!fetchOnly) {
|
||||
const isCRTEnabled = await getIsCRTEnabled(operator.database);
|
||||
|
||||
const models = [];
|
||||
const postModels = await operator.handlePosts({
|
||||
actionType,
|
||||
order: data.order,
|
||||
posts: data.posts,
|
||||
previousPostId: data.previousPostId,
|
||||
prepareRecordsOnly: true,
|
||||
});
|
||||
models.push(...postModels);
|
||||
|
||||
if (authors.length) {
|
||||
const userModels = await operator.handleUsers({users: authors, prepareRecordsOnly: true});
|
||||
models.push(...userModels);
|
||||
}
|
||||
|
||||
let lastPostAt = 0;
|
||||
for (const post of data.posts) {
|
||||
const isCrtReply = isCRTEnabled && post.root_id !== '';
|
||||
if (!isCrtReply) {
|
||||
lastPostAt = post.create_at > lastPostAt ? post.create_at : lastPostAt;
|
||||
}
|
||||
}
|
||||
|
||||
if (lastPostAt) {
|
||||
const {member: memberModel} = await updateLastPostAt(serverUrl, channelId, lastPostAt, true);
|
||||
if (memberModel) {
|
||||
models.push(memberModel);
|
||||
}
|
||||
}
|
||||
|
||||
if (isCRTEnabled) {
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, data.posts);
|
||||
if (threadModels?.length) {
|
||||
models.push(...threadModels);
|
||||
}
|
||||
}
|
||||
|
||||
if (models.length) {
|
||||
await operator.batchRecords(models);
|
||||
if (!fetchOnly) {
|
||||
await storePostsForChannel(
|
||||
serverUrl, channelId,
|
||||
data.posts, data.order, data.previousPostId ?? '',
|
||||
actionType, authors,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return {posts: data.posts, order: data.order, authors, actionType, previousPostId: data.previousPostId};
|
||||
} catch (error) {
|
||||
logError('FetchPostsForChannel', error);
|
||||
return {error};
|
||||
}
|
||||
|
||||
return {posts: data.posts, order: data.order, authors, actionType, previousPostId: data.previousPostId};
|
||||
}
|
||||
|
||||
export const fetchPostsForUnreadChannels = async (serverUrl: string, channels: Channel[], memberships: ChannelMembership[], excludeChannelId?: string, emitEvent = false) => {
|
||||
@@ -389,21 +350,12 @@ export const fetchPostsForUnreadChannels = async (serverUrl: string, channels: C
|
||||
};
|
||||
|
||||
export async function fetchPosts(serverUrl: string, channelId: string, page = 0, perPage = General.POST_CHUNK_SIZE, fetchOnly = false): Promise<PostsRequest> {
|
||||
const operator = DatabaseManager.serverDatabases[serverUrl]?.operator;
|
||||
if (!operator) {
|
||||
return {error: `${serverUrl} database not found`};
|
||||
}
|
||||
let client: Client;
|
||||
try {
|
||||
client = NetworkManager.getClient(serverUrl);
|
||||
} catch (error) {
|
||||
return {error};
|
||||
}
|
||||
|
||||
try {
|
||||
const {operator} = DatabaseManager.getServerDatabaseAndOperator(serverUrl);
|
||||
const client = NetworkManager.getClient(serverUrl);
|
||||
const isCRTEnabled = await getIsCRTEnabled(operator.database);
|
||||
const data = await client.getPosts(channelId, page, perPage, isCRTEnabled, isCRTEnabled);
|
||||
const result = await processPostsFetched(data);
|
||||
const result = processPostsFetched(data);
|
||||
if (!fetchOnly) {
|
||||
const models = await operator.handlePosts({
|
||||
...result,
|
||||
@@ -421,7 +373,7 @@ export async function fetchPosts(serverUrl: string, channelId: string, page = 0,
|
||||
}
|
||||
|
||||
if (isCRTEnabled) {
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, result.posts);
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, result.posts, false);
|
||||
if (threadModels?.length) {
|
||||
models.push(...threadModels);
|
||||
}
|
||||
@@ -479,7 +431,7 @@ export async function fetchPostsBefore(serverUrl: string, channelId: string, pos
|
||||
}
|
||||
|
||||
if (isCRTEnabled) {
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, result.posts);
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, result.posts, false);
|
||||
if (threadModels?.length) {
|
||||
models.push(...threadModels);
|
||||
}
|
||||
@@ -535,7 +487,7 @@ export async function fetchPostsSince(serverUrl: string, channelId: string, sinc
|
||||
}
|
||||
|
||||
if (isCRTEnabled) {
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, result.posts);
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, result.posts, false);
|
||||
if (threadModels?.length) {
|
||||
models.push(...threadModels);
|
||||
}
|
||||
@@ -610,6 +562,7 @@ export const fetchPostAuthors = async (serverUrl: string, posts: Post[], fetchOn
|
||||
|
||||
return {authors: [] as UserProfile[]};
|
||||
} catch (error) {
|
||||
logError('FETCH AUTHORS ERROR', error);
|
||||
forceLogoutIfNecessary(serverUrl, error as ClientErrorProps);
|
||||
return {error};
|
||||
}
|
||||
@@ -649,7 +602,7 @@ export async function fetchPostThread(serverUrl: string, postId: string, fetchOn
|
||||
}
|
||||
|
||||
if (isCRTEnabled) {
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, result.posts);
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, result.posts, true);
|
||||
if (threadModels?.length) {
|
||||
models.push(...threadModels);
|
||||
}
|
||||
@@ -719,7 +672,7 @@ export async function fetchPostsAround(serverUrl: string, channelId: string, pos
|
||||
models.push(...posts);
|
||||
|
||||
if (isCRTEnabled) {
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, data.posts);
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, data.posts, false);
|
||||
if (threadModels?.length) {
|
||||
models.push(...threadModels);
|
||||
}
|
||||
@@ -830,7 +783,7 @@ export async function fetchPostById(serverUrl: string, postId: string, fetchOnly
|
||||
|
||||
const isCRTEnabled = await getIsCRTEnabled(operator.database);
|
||||
if (isCRTEnabled) {
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, [post]);
|
||||
const threadModels = await prepareThreadsFromReceivedPosts(operator, [post], false);
|
||||
if (threadModels?.length) {
|
||||
models.push(...threadModels);
|
||||
}
|
||||
@@ -1052,7 +1005,7 @@ export async function fetchSavedPosts(serverUrl: string, teamId?: string, channe
|
||||
|
||||
const isCRTEnabled = await getIsCRTEnabled(operator.database);
|
||||
if (isCRTEnabled) {
|
||||
promises.push(prepareThreadsFromReceivedPosts(operator, postsArray));
|
||||
promises.push(prepareThreadsFromReceivedPosts(operator, postsArray, false));
|
||||
}
|
||||
|
||||
const modelArrays = await Promise.all(promises);
|
||||
@@ -1134,7 +1087,7 @@ export async function fetchPinnedPosts(serverUrl: string, channelId: string) {
|
||||
);
|
||||
|
||||
if (isCRTEnabled) {
|
||||
promises.push(prepareThreadsFromReceivedPosts(operator, postsArray));
|
||||
promises.push(prepareThreadsFromReceivedPosts(operator, postsArray, false));
|
||||
}
|
||||
|
||||
const modelArrays = await Promise.all(promises);
|
||||
|
||||
@@ -77,7 +77,7 @@ export const searchPosts = async (serverUrl: string, params: PostSearchParams):
|
||||
if (postsArray.length) {
|
||||
const isCRTEnabled = await getIsCRTEnabled(operator.database);
|
||||
if (isCRTEnabled) {
|
||||
promises.push(prepareThreadsFromReceivedPosts(operator, postsArray));
|
||||
promises.push(prepareThreadsFromReceivedPosts(operator, postsArray, false));
|
||||
}
|
||||
|
||||
const {authors} = await fetchPostAuthors(serverUrl, postsArray, true);
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
markChannelAsViewed, removeCurrentUserFromChannel, setChannelDeleteAt,
|
||||
storeMyChannelsForTeam, updateChannelInfoFromChannel, updateMyChannelFromWebsocket,
|
||||
} from '@actions/local/channel';
|
||||
import {storePostsForChannel} from '@actions/local/post';
|
||||
import {switchToGlobalThreads} from '@actions/local/thread';
|
||||
import {fetchMissingDirectChannelsInfo, fetchMyChannel, fetchChannelStats, fetchChannelById, switchToChannelById} from '@actions/remote/channel';
|
||||
import {fetchPostsForChannel} from '@actions/remote/post';
|
||||
@@ -281,18 +282,16 @@ export async function handleUserAddedToChannelEvent(serverUrl: string, msg: any)
|
||||
}
|
||||
|
||||
const {posts, order, authors, actionType, previousPostId} = await fetchPostsForChannel(serverUrl, channelId, true);
|
||||
if (actionType) {
|
||||
models.push(...await operator.handlePosts({
|
||||
actionType,
|
||||
order,
|
||||
posts,
|
||||
previousPostId,
|
||||
prepareRecordsOnly: true,
|
||||
}));
|
||||
}
|
||||
if (posts?.length && order?.length) {
|
||||
const {models: prepared} = await storePostsForChannel(
|
||||
serverUrl, channelId,
|
||||
posts, order, previousPostId ?? '',
|
||||
actionType, authors,
|
||||
);
|
||||
|
||||
if (authors?.length) {
|
||||
models.push(...await operator.handleUsers({users: authors, prepareRecordsOnly: true}));
|
||||
if (prepared?.length) {
|
||||
models.push(...prepared);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const addedUser = getUserById(database, userId);
|
||||
|
||||
@@ -25,6 +25,7 @@ import {serverSchema} from '@database/schema/server';
|
||||
import {queryActiveServer, queryServer, queryServerByIdentifier} from '@queries/app/servers';
|
||||
import {DatabaseType} from '@typings/database/enums';
|
||||
import {emptyFunction} from '@utils/general';
|
||||
import {logDebug, logError} from '@utils/log';
|
||||
import {deleteIOSDatabase, getIOSAppGroupDetails} from '@utils/mattermost_managed';
|
||||
import {hashCode} from '@utils/security';
|
||||
import {removeProtocol} from '@utils/url';
|
||||
@@ -157,6 +158,8 @@ class DatabaseManager {
|
||||
return serverDatabase;
|
||||
} catch (e) {
|
||||
// TODO : report to sentry? Show something on the UI ?
|
||||
|
||||
logError('Error initializing database', e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -461,16 +464,19 @@ class DatabaseManager {
|
||||
private buildMigrationCallbacks = (dbName: string) => {
|
||||
const migrationEvents = {
|
||||
onSuccess: () => {
|
||||
logDebug('DB Migration success', dbName);
|
||||
return DeviceEventEmitter.emit(MIGRATION_EVENTS.MIGRATION_SUCCESS, {
|
||||
dbName,
|
||||
});
|
||||
},
|
||||
onStart: () => {
|
||||
logDebug('DB Migration start', dbName);
|
||||
return DeviceEventEmitter.emit(MIGRATION_EVENTS.MIGRATION_STARTED, {
|
||||
dbName,
|
||||
});
|
||||
},
|
||||
onError: (error: Error) => {
|
||||
logDebug('DB Migration error', dbName);
|
||||
return DeviceEventEmitter.emit(MIGRATION_EVENTS.MIGRATION_ERROR, {
|
||||
dbName,
|
||||
error,
|
||||
|
||||
@@ -1,9 +1,34 @@
|
||||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
|
||||
import {schemaMigrations} from '@nozbe/watermelondb/Schema/migrations';
|
||||
|
||||
// NOTE : To implement migration, please follow this document
|
||||
// https://nozbe.github.io/WatermelonDB/Advanced/Migrations.html
|
||||
|
||||
export default schemaMigrations({migrations: []});
|
||||
import {schemaMigrations, addColumns} from '@nozbe/watermelondb/Schema/migrations';
|
||||
|
||||
import {MM_TABLES} from '@constants/database';
|
||||
|
||||
const {SERVER: {
|
||||
MY_CHANNEL,
|
||||
THREAD,
|
||||
}} = MM_TABLES;
|
||||
|
||||
export default schemaMigrations({migrations: [
|
||||
{
|
||||
toVersion: 2,
|
||||
steps: [
|
||||
addColumns({
|
||||
table: MY_CHANNEL,
|
||||
columns: [
|
||||
{name: 'last_fetched_at', type: 'number', isIndexed: true},
|
||||
],
|
||||
}),
|
||||
addColumns({
|
||||
table: THREAD,
|
||||
columns: [
|
||||
{name: 'last_fetched_at', type: 'number', isIndexed: true},
|
||||
],
|
||||
}),
|
||||
],
|
||||
},
|
||||
]});
|
||||
|
||||
@@ -28,6 +28,9 @@ export default class MyChannelModel extends Model implements MyChannelModelInter
|
||||
/** last_post_at : The timestamp for any last post on this channel */
|
||||
@field('last_post_at') lastPostAt!: number;
|
||||
|
||||
/** last_last_fetched_at_at : The timestamp when we successfully last fetched post on this channel */
|
||||
@field('last_fetched_at') lastFetchedAt!: number;
|
||||
|
||||
/** last_viewed_at : The timestamp showing the user's last viewed post on this channel */
|
||||
@field('last_viewed_at') lastViewedAt!: number;
|
||||
|
||||
@@ -60,4 +63,9 @@ export default class MyChannelModel extends Model implements MyChannelModelInter
|
||||
settings?.destroyPermanently();
|
||||
super.destroyPermanently();
|
||||
}
|
||||
|
||||
resetPreparedState() {
|
||||
// @ts-expect-error hack setting _preparedState
|
||||
this._preparedState = null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,6 +37,9 @@ export default class ThreadModel extends Model implements ThreadModelInterface {
|
||||
/** last_reply_at : The timestamp of when user last replied to the thread. */
|
||||
@field('last_reply_at') lastReplyAt!: number;
|
||||
|
||||
/** last_last_fetched_at_at : The timestamp when we successfully last fetched post on this thread */
|
||||
@field('last_fetched_at') lastFetchedAt!: number;
|
||||
|
||||
/** last_viewed_at : The timestamp of when user last viewed the thread. */
|
||||
@field('last_viewed_at') lastViewedAt!: number;
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
|
||||
import {updateMyChannelLastFetchedAt} from '@actions/local/channel';
|
||||
import DatabaseManager from '@database/manager';
|
||||
import {
|
||||
buildChannelMembershipKey,
|
||||
@@ -18,9 +19,10 @@ import ServerDataOperator from '..';
|
||||
|
||||
describe('*** Operator: Channel Handlers tests ***', () => {
|
||||
let operator: ServerDataOperator;
|
||||
const serverUrl = 'baseHandler.test.com';
|
||||
beforeAll(async () => {
|
||||
await DatabaseManager.init(['baseHandler.test.com']);
|
||||
operator = DatabaseManager.serverDatabases['baseHandler.test.com']!.operator;
|
||||
await DatabaseManager.init([serverUrl]);
|
||||
operator = DatabaseManager.serverDatabases[serverUrl]!.operator;
|
||||
});
|
||||
|
||||
it('=> HandleChannel: should write to the CHANNEL table', async () => {
|
||||
@@ -196,6 +198,64 @@ describe('*** Operator: Channel Handlers tests ***', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('=> HandleMyChannel: should keep the previous lastFetchedAt for MY_CHANNEL', async () => {
|
||||
const channels: Channel[] = [{
|
||||
id: 'c',
|
||||
name: 'channel',
|
||||
display_name: 'Channel',
|
||||
type: 'O',
|
||||
create_at: 1,
|
||||
update_at: 1,
|
||||
delete_at: 0,
|
||||
team_id: '123',
|
||||
header: '',
|
||||
purpose: '',
|
||||
last_post_at: 2,
|
||||
creator_id: 'me',
|
||||
total_msg_count: 20,
|
||||
extra_update_at: 0,
|
||||
shared: false,
|
||||
scheme_id: null,
|
||||
group_constrained: false,
|
||||
}];
|
||||
const myChannels: ChannelMembership[] = [
|
||||
{
|
||||
id: 'c',
|
||||
user_id: 'me',
|
||||
channel_id: 'c',
|
||||
last_post_at: 1617311494451,
|
||||
last_viewed_at: 1617311494451,
|
||||
last_update_at: 1617311494451,
|
||||
mention_count: 3,
|
||||
msg_count: 10,
|
||||
roles: 'guest',
|
||||
notify_props: {
|
||||
desktop: 'default',
|
||||
email: 'default',
|
||||
mark_unread: 'mention',
|
||||
push: 'mention',
|
||||
ignore_channel_mentions: 'default',
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
await operator.handleMyChannel({
|
||||
channels,
|
||||
myChannels,
|
||||
prepareRecordsOnly: false,
|
||||
});
|
||||
|
||||
await updateMyChannelLastFetchedAt(serverUrl, 'c', 123456789, false);
|
||||
myChannels[0].last_viewed_at = 1617311494452;
|
||||
const updated = await operator.handleMyChannel({
|
||||
channels,
|
||||
myChannels,
|
||||
prepareRecordsOnly: false,
|
||||
});
|
||||
|
||||
expect(updated[0].lastFetchedAt).toBe(123456789);
|
||||
});
|
||||
|
||||
it('=> HandleChannelMembership: should write to the CHANNEL_MEMBERSHIP table', async () => {
|
||||
expect.assertions(2);
|
||||
const channelMemberships: ChannelMembership[] = [
|
||||
|
||||
@@ -46,8 +46,9 @@ describe('*** Operator: Thread Handlers tests ***', () => {
|
||||
is_following: true,
|
||||
unread_replies: 0,
|
||||
unread_mentions: 0,
|
||||
lastFetchedAt: 0,
|
||||
},
|
||||
] as Thread[];
|
||||
] as ThreadWithLastFetchedAt[];
|
||||
|
||||
const threadsMap = {team_id_1: threads};
|
||||
await operator.handleThreads({threads, loadedInGlobalThreads: false, prepareRecordsOnly: false, teamId: 'team_id_1'});
|
||||
|
||||
@@ -138,6 +138,7 @@ export const transformMyChannelRecord = async ({action, database, value}: Transf
|
||||
myChannel.isUnread = Boolean(raw.is_unread);
|
||||
myChannel.lastViewedAt = raw.last_viewed_at;
|
||||
myChannel.viewedAt = record?.viewedAt || 0;
|
||||
myChannel.lastFetchedAt = record?.lastFetchedAt || 0;
|
||||
};
|
||||
|
||||
return prepareBaseRecord({
|
||||
|
||||
@@ -24,7 +24,7 @@ const {
|
||||
* @returns {Promise<ThreadModel>}
|
||||
*/
|
||||
export const transformThreadRecord = ({action, database, value}: TransformerArgs): Promise<ThreadModel> => {
|
||||
const raw = value.raw as Thread;
|
||||
const raw = value.raw as ThreadWithLastFetchedAt;
|
||||
const record = value.record as ThreadModel;
|
||||
const isCreateAction = action === OperationType.CREATE;
|
||||
|
||||
@@ -38,6 +38,7 @@ export const transformThreadRecord = ({action, database, value}: TransformerArgs
|
||||
thread.unreadReplies = raw.unread_replies ?? record?.unreadReplies ?? 0;
|
||||
thread.unreadMentions = raw.unread_mentions ?? record?.unreadMentions ?? 0;
|
||||
thread.viewedAt = record?.viewedAt || 0;
|
||||
thread.lastFetchedAt = Math.max(record?.lastFetchedAt || 0, raw.lastFetchedAt || 0);
|
||||
};
|
||||
|
||||
return prepareBaseRecord({
|
||||
|
||||
@@ -37,7 +37,7 @@ import {
|
||||
} from './table_schemas';
|
||||
|
||||
export const serverSchema: AppSchema = appSchema({
|
||||
version: 1,
|
||||
version: 2,
|
||||
tables: [
|
||||
CategorySchema,
|
||||
CategoryChannelSchema,
|
||||
|
||||
@@ -18,6 +18,7 @@ export default tableSchema({
|
||||
{name: 'message_count', type: 'number'},
|
||||
{name: 'roles', type: 'string'},
|
||||
{name: 'viewed_at', type: 'number'},
|
||||
{name: 'last_fetched_at', type: 'number', isIndexed: true},
|
||||
],
|
||||
});
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ export default tableSchema({
|
||||
{name: 'unread_mentions', type: 'number'},
|
||||
{name: 'unread_replies', type: 'number'},
|
||||
{name: 'viewed_at', type: 'number'},
|
||||
{name: 'last_fetched_at', type: 'number', isIndexed: true},
|
||||
],
|
||||
});
|
||||
|
||||
|
||||
@@ -43,7 +43,7 @@ const {
|
||||
describe('*** Test schema for SERVER database ***', () => {
|
||||
it('=> The SERVER SCHEMA should strictly match', () => {
|
||||
expect(serverSchema).toEqual({
|
||||
version: 1,
|
||||
version: 2,
|
||||
tables: {
|
||||
[CATEGORY]: {
|
||||
name: CATEGORY,
|
||||
@@ -165,6 +165,7 @@ describe('*** Test schema for SERVER database ***', () => {
|
||||
message_count: {name: 'message_count', type: 'number'},
|
||||
roles: {name: 'roles', type: 'string'},
|
||||
viewed_at: {name: 'viewed_at', type: 'number'},
|
||||
last_fetched_at: {name: 'last_fetched_at', type: 'number', isIndexed: true},
|
||||
},
|
||||
columnArray: [
|
||||
{name: 'is_unread', type: 'boolean'},
|
||||
@@ -175,6 +176,7 @@ describe('*** Test schema for SERVER database ***', () => {
|
||||
{name: 'message_count', type: 'number'},
|
||||
{name: 'roles', type: 'string'},
|
||||
{name: 'viewed_at', type: 'number'},
|
||||
{name: 'last_fetched_at', type: 'number', isIndexed: true},
|
||||
],
|
||||
},
|
||||
[MY_CHANNEL_SETTINGS]: {
|
||||
@@ -523,6 +525,7 @@ describe('*** Test schema for SERVER database ***', () => {
|
||||
unread_mentions: {name: 'unread_mentions', type: 'number'},
|
||||
unread_replies: {name: 'unread_replies', type: 'number'},
|
||||
viewed_at: {name: 'viewed_at', type: 'number'},
|
||||
last_fetched_at: {name: 'last_fetched_at', type: 'number', isIndexed: true},
|
||||
},
|
||||
columnArray: [
|
||||
{name: 'is_following', type: 'boolean'},
|
||||
@@ -532,6 +535,7 @@ describe('*** Test schema for SERVER database ***', () => {
|
||||
{name: 'unread_mentions', type: 'number'},
|
||||
{name: 'unread_replies', type: 'number'},
|
||||
{name: 'viewed_at', type: 'number'},
|
||||
{name: 'last_fetched_at', type: 'number', isIndexed: true},
|
||||
],
|
||||
},
|
||||
[THREAD_PARTICIPANT]: {
|
||||
|
||||
@@ -9,7 +9,7 @@ import {MM_TABLES} from '@constants/database';
|
||||
import DatabaseManager from '@database/manager';
|
||||
import {observeAllMyChannelNotifyProps} from '@queries/servers/channel';
|
||||
import {queryMyTeams} from '@queries/servers/team';
|
||||
import {getIsCRTEnabled, observeThreadMentionCount, queryThreads} from '@queries/servers/thread';
|
||||
import {getIsCRTEnabled, observeThreadMentionCount, queryThreads, observeUnreadsAndMentionsInTeam} from '@queries/servers/thread';
|
||||
|
||||
import type MyChannelModel from '@typings/database/models/servers/my_channel';
|
||||
|
||||
@@ -18,15 +18,16 @@ const {SERVER: {CHANNEL, MY_CHANNEL}} = MM_TABLES;
|
||||
export type UnreadObserverArgs = {
|
||||
myChannels: MyChannelModel[];
|
||||
settings?: Record<string, Partial<ChannelNotifyProps>>;
|
||||
threadUnreads?: boolean;
|
||||
threadMentionCount: number;
|
||||
}
|
||||
|
||||
type ServerUnreadObserver = {
|
||||
(serverUrl: string, {myChannels, settings, threadMentionCount}: UnreadObserverArgs): void;
|
||||
(serverUrl: string, {myChannels, settings, threadMentionCount, threadUnreads}: UnreadObserverArgs): void;
|
||||
}
|
||||
|
||||
type UnreadObserver = {
|
||||
({myChannels, settings, threadMentionCount}: UnreadObserverArgs): void;
|
||||
({myChannels, settings, threadMentionCount, threadUnreads}: UnreadObserverArgs): void;
|
||||
}
|
||||
|
||||
export const subscribeServerUnreadAndMentions = (serverUrl: string, observer: UnreadObserver) => {
|
||||
@@ -39,8 +40,8 @@ export const subscribeServerUnreadAndMentions = (serverUrl: string, observer: Un
|
||||
observeWithColumns(['is_unread', 'mentions_count']).
|
||||
pipe(
|
||||
combineLatestWith(observeAllMyChannelNotifyProps(server.database)),
|
||||
combineLatestWith(observeThreadMentionCount(server.database, undefined, false)),
|
||||
map$(([[myChannels, settings], threadMentionCount]) => ({myChannels, settings, threadMentionCount})),
|
||||
combineLatestWith(observeUnreadsAndMentionsInTeam(server.database, undefined, false)),
|
||||
map$(([[myChannels, settings], {unreads, mentions}]) => ({myChannels, settings, threadUnreads: unreads, threadMentionCount: mentions})),
|
||||
).
|
||||
subscribe(observer);
|
||||
}
|
||||
@@ -77,8 +78,8 @@ export const subscribeUnreadAndMentionsByServer = (serverUrl: string, observer:
|
||||
observeWithColumns(['mentions_count', 'is_unread']).
|
||||
pipe(
|
||||
combineLatestWith(observeAllMyChannelNotifyProps(server.database)),
|
||||
combineLatestWith(observeThreadMentionCount(server.database, undefined, false)),
|
||||
map$(([[myChannels, settings], threadMentionCount]) => ({myChannels, settings, threadMentionCount})),
|
||||
combineLatestWith(observeUnreadsAndMentionsInTeam(server.database, undefined, false)),
|
||||
map$(([[myChannels, settings], {unreads, mentions}]) => ({myChannels, settings, threadUnreads: unreads, threadMentionCount: mentions})),
|
||||
).
|
||||
subscribe(observer.bind(undefined, serverUrl));
|
||||
}
|
||||
|
||||
@@ -66,15 +66,15 @@ export const observeTeamIdByThread = (thread: ThreadModel) => {
|
||||
);
|
||||
};
|
||||
|
||||
export const observeUnreadsAndMentionsInTeam = (database: Database, teamId?: string, includeDmGm?: boolean): Observable<{unreads: number; mentions: number}> => {
|
||||
export const observeUnreadsAndMentionsInTeam = (database: Database, teamId?: string, includeDmGm?: boolean): Observable<{unreads: boolean; mentions: number}> => {
|
||||
const observeThreads = () => queryThreads(database, teamId, true, includeDmGm).
|
||||
observeWithColumns(['unread_replies', 'unread_mentions']).
|
||||
pipe(
|
||||
switchMap((threads) => {
|
||||
let unreads = 0;
|
||||
let unreads = false;
|
||||
let mentions = 0;
|
||||
for (const thread of threads) {
|
||||
unreads += thread.unreadReplies;
|
||||
unreads = unreads || Boolean(thread.unreadReplies);
|
||||
mentions += thread.unreadMentions;
|
||||
}
|
||||
|
||||
@@ -83,14 +83,16 @@ export const observeUnreadsAndMentionsInTeam = (database: Database, teamId?: str
|
||||
);
|
||||
|
||||
return observeIsCRTEnabled(database).pipe(
|
||||
switchMap((hasCRT) => (hasCRT ? observeThreads() : of$({unreads: 0, mentions: 0}))),
|
||||
switchMap((hasCRT) => (hasCRT ? observeThreads() : of$({unreads: false, mentions: 0}))),
|
||||
distinctUntilChanged((x, y) => x.mentions === y.mentions && x.unreads === y.unreads),
|
||||
);
|
||||
};
|
||||
|
||||
// On receiving "posts", Save the "root posts" as "threads"
|
||||
export const prepareThreadsFromReceivedPosts = async (operator: ServerDataOperator, posts: Post[]) => {
|
||||
export const prepareThreadsFromReceivedPosts = async (operator: ServerDataOperator, posts: Post[], updateLastFetchAt: boolean) => {
|
||||
const models: Model[] = [];
|
||||
const threads: Thread[] = [];
|
||||
const threads: ThreadWithLastFetchedAt[] = [];
|
||||
const toUpdate: {[rootId: string]: number | undefined} = {};
|
||||
posts.forEach((post: Post) => {
|
||||
if (!post.root_id && post.type === '') {
|
||||
threads.push({
|
||||
@@ -99,13 +101,28 @@ export const prepareThreadsFromReceivedPosts = async (operator: ServerDataOperat
|
||||
reply_count: post.reply_count,
|
||||
last_reply_at: post.last_reply_at,
|
||||
is_following: post.is_following,
|
||||
} as Thread);
|
||||
lastFetchedAt: post.create_at,
|
||||
} as ThreadWithLastFetchedAt);
|
||||
} else if (post.root_id && updateLastFetchAt) {
|
||||
toUpdate[post.root_id] = Math.max(toUpdate[post.root_id] || 0, post.create_at, post.update_at, post.delete_at);
|
||||
}
|
||||
});
|
||||
if (threads.length) {
|
||||
const threadModels = await operator.handleThreads({threads, prepareRecordsOnly: true});
|
||||
models.push(...threadModels);
|
||||
}
|
||||
const toUpdateKeys = Object.keys(toUpdate);
|
||||
if (toUpdateKeys.length) {
|
||||
const toUpdateThreads = await Promise.all(toUpdateKeys.map((key) => getThreadById(operator.database, key)));
|
||||
for (const thread of toUpdateThreads) {
|
||||
if (thread) {
|
||||
const model = thread.prepareUpdate((record) => {
|
||||
record.lastFetchedAt = Math.max(record.lastFetchedAt, toUpdate[thread.id] || 0);
|
||||
});
|
||||
models.push(model);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return models;
|
||||
};
|
||||
|
||||
@@ -23,7 +23,7 @@ describe('components/channel_list/categories/body', () => {
|
||||
unreadChannels={[]}
|
||||
onChannelSwitch={() => undefined}
|
||||
onlyUnreads={false}
|
||||
unreadThreads={{unreads: 0, mentions: 0}}
|
||||
unreadThreads={{unreads: false, mentions: 0}}
|
||||
/>,
|
||||
{database},
|
||||
);
|
||||
|
||||
@@ -35,7 +35,7 @@ type UnreadCategoriesProps = {
|
||||
onChannelSwitch: (channelId: string) => void;
|
||||
onlyUnreads: boolean;
|
||||
unreadChannels: ChannelModel[];
|
||||
unreadThreads: {unreads: number; mentions: number};
|
||||
unreadThreads: {unreads: boolean; mentions: number};
|
||||
}
|
||||
|
||||
const extractKey = (item: ChannelModel) => item.id;
|
||||
@@ -62,7 +62,7 @@ const UnreadCategories = ({onChannelSwitch, onlyUnreads, unreadChannels, unreadT
|
||||
<Empty onlyUnreads={onlyUnreads}/>
|
||||
) : undefined;
|
||||
|
||||
if (!unreadChannels.length && !unreadThreads.mentions && !unreadThreads.mentions && !onlyUnreads) {
|
||||
if (!unreadChannels.length && !unreadThreads.mentions && !unreadThreads.unreads && !onlyUnreads) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ describe('Thread item in the channel list', () => {
|
||||
currentChannelId='someChannelId'
|
||||
onlyUnreads={false}
|
||||
unreadsAndMentions={{
|
||||
unreads: 0,
|
||||
unreads: false,
|
||||
mentions: 0,
|
||||
}}
|
||||
/>,
|
||||
@@ -29,7 +29,7 @@ describe('Thread item in the channel list', () => {
|
||||
currentChannelId='someChannelId'
|
||||
onlyUnreads={true}
|
||||
unreadsAndMentions={{
|
||||
unreads: 0,
|
||||
unreads: false,
|
||||
mentions: 0,
|
||||
}}
|
||||
/>,
|
||||
|
||||
@@ -39,7 +39,7 @@ type Props = {
|
||||
currentChannelId: string;
|
||||
onlyUnreads: boolean;
|
||||
unreadsAndMentions: {
|
||||
unreads: number;
|
||||
unreads: boolean;
|
||||
mentions: number;
|
||||
};
|
||||
};
|
||||
|
||||
@@ -69,11 +69,11 @@ const Servers = React.forwardRef<ServersRef>((props, ref) => {
|
||||
setTotal({mentions, unread});
|
||||
};
|
||||
|
||||
const unreadsSubscription = (serverUrl: string, {myChannels, settings, threadMentionCount}: UnreadObserverArgs) => {
|
||||
const unreadsSubscription = (serverUrl: string, {myChannels, settings, threadMentionCount, threadUnreads}: UnreadObserverArgs) => {
|
||||
const unreads = subscriptions.get(serverUrl);
|
||||
if (unreads) {
|
||||
let mentions = 0;
|
||||
let unread = false;
|
||||
let unread = Boolean(threadUnreads);
|
||||
for (const myChannel of myChannels) {
|
||||
const isMuted = settings?.[myChannel.id]?.mark_unread === 'mention';
|
||||
mentions += myChannel.mentionsCount;
|
||||
@@ -101,14 +101,14 @@ const Servers = React.forwardRef<ServersRef>((props, ref) => {
|
||||
|
||||
for (const server of servers) {
|
||||
const {lastActiveAt, url} = server;
|
||||
if (lastActiveAt && url !== currentServerUrl && !subscriptions.has(url)) {
|
||||
if (lastActiveAt && (url !== currentServerUrl) && !subscriptions.has(url)) {
|
||||
const unreads: UnreadSubscription = {
|
||||
mentions: 0,
|
||||
unread: false,
|
||||
};
|
||||
subscriptions.set(url, unreads);
|
||||
unreads.subscription = subscribeUnreadAndMentionsByServer(url, unreadsSubscription);
|
||||
} else if ((!lastActiveAt || url === currentServerUrl) && subscriptions.has(url)) {
|
||||
} else if ((!lastActiveAt || (url === currentServerUrl)) && subscriptions.has(url)) {
|
||||
subscriptions.get(url)?.subscription?.unsubscribe();
|
||||
subscriptions.delete(url);
|
||||
updateTotal();
|
||||
|
||||
@@ -165,9 +165,9 @@ const ServerItem = ({
|
||||
displayName = intl.formatMessage({id: 'servers.default', defaultMessage: 'Default Server'});
|
||||
}
|
||||
|
||||
const unreadsSubscription = ({myChannels, settings, threadMentionCount}: UnreadObserverArgs) => {
|
||||
const unreadsSubscription = ({myChannels, settings, threadMentionCount, threadUnreads}: UnreadObserverArgs) => {
|
||||
let mentions = 0;
|
||||
let isUnread = false;
|
||||
let isUnread = Boolean(threadUnreads);
|
||||
for (const myChannel of myChannels) {
|
||||
const isMuted = settings?.[myChannel.id]?.mark_unread === 'mention';
|
||||
mentions += myChannel.mentionsCount;
|
||||
|
||||
@@ -78,3 +78,10 @@ export const processPostsFetched = (data: PostResponse) => {
|
||||
previousPostId,
|
||||
};
|
||||
};
|
||||
|
||||
export const getLastFetchedAtFromPosts = (posts?: Post[]) => {
|
||||
return posts?.reduce((timestamp: number, p) => {
|
||||
const maxTimestamp = Math.max(p.create_at, p.update_at, p.delete_at);
|
||||
return Math.max(maxTimestamp, timestamp);
|
||||
}, 0) || 0;
|
||||
};
|
||||
|
||||
@@ -104,7 +104,7 @@ struct ThreadSetters {
|
||||
}
|
||||
|
||||
extension Database {
|
||||
public func queryPostsSinceForChannel(withId channelId: String, withServerUrl serverUrl: String) throws -> Int64? {
|
||||
public func queryLastPostCreateAt(withId channelId: String, withServerUrl serverUrl: String) throws -> Int64? {
|
||||
let db = try getDatabaseForServer(serverUrl)
|
||||
|
||||
let earliestCol = Expression<Int64>("earliest")
|
||||
@@ -116,7 +116,6 @@ extension Database {
|
||||
.order(latestCol.desc)
|
||||
.limit(1)
|
||||
|
||||
|
||||
var earliest: Int64?
|
||||
var latest: Int64?
|
||||
if let result = try? db.pluck(earliestLatestQuery) {
|
||||
@@ -144,24 +143,39 @@ extension Database {
|
||||
return nil
|
||||
}
|
||||
|
||||
private func queryPostsInChannelEarliestAndLatest(_ serverUrl: String, _ channelId: String) throws -> (Int64, Int64) {
|
||||
public func queryPostsSinceForChannel(withId channelId: String, withServerUrl serverUrl: String) throws -> Int64? {
|
||||
let db = try getDatabaseForServer(serverUrl)
|
||||
let earliest = Expression<Int64>("earliest")
|
||||
let latest = Expression<Int64>("latest")
|
||||
let id = Expression<String>("channel_id")
|
||||
let query = postsInChannelTable
|
||||
.select(earliest, latest)
|
||||
.where(id == channelId)
|
||||
.order(latest.desc)
|
||||
.limit(1)
|
||||
|
||||
|
||||
for result in try db.prepare(query) {
|
||||
return (try result.get(earliest),
|
||||
try result.get(latest))
|
||||
let idCol = Expression<String>("id")
|
||||
let lastFetchedAtColAsDouble = Expression<Double?>("last_fetched_at")
|
||||
let lastFetchedAtColAsInt64 = Expression<Int64?>("last_fetched_at")
|
||||
let query = myChannelTable.where(idCol == channelId)
|
||||
|
||||
if let result = try? db.pluck(query) {
|
||||
let lastFetchAtInt64 = result[lastFetchedAtColAsInt64]
|
||||
if lastFetchAtInt64 != nil,
|
||||
lastFetchAtInt64 > 0 {
|
||||
return lastFetchAtInt64
|
||||
}
|
||||
if let last = result[lastFetchedAtColAsDouble],
|
||||
last > 0 {
|
||||
return Int64(last)
|
||||
}
|
||||
}
|
||||
|
||||
return try queryLastPostCreateAt(withId: channelId, withServerUrl: serverUrl)
|
||||
}
|
||||
|
||||
private func updateMyChannelLastFetchedAt(_ db: Connection, _ channelId: String, _ latest: Int64) throws {
|
||||
let idCol = Expression<String>("id")
|
||||
let lastFetchedAtCol = Expression<Int64>("last_fetched_at")
|
||||
let statusCol = Expression<String>("_status")
|
||||
|
||||
return (0, 0)
|
||||
let query = myChannelTable
|
||||
.where(idCol == channelId)
|
||||
.update(lastFetchedAtCol <- latest, statusCol <- "updated")
|
||||
|
||||
try db.run(query)
|
||||
}
|
||||
|
||||
public func handlePostData(_ db: Connection, _ postData: PostData, _ channelId: String, _ usedSince: Bool = false, _ receivingThreads: Bool = false) throws {
|
||||
@@ -171,6 +185,9 @@ extension Database {
|
||||
let latest = sortedChainedPosts.last!.create_at
|
||||
if (!receivingThreads) {
|
||||
try handlePostsInChannel(db, channelId, earliest, latest, usedSince)
|
||||
|
||||
let lastFetchedAt = postData.posts.map({max($0.create_at, $0.update_at, $0.delete_at)}).max()
|
||||
try updateMyChannelLastFetchedAt(db, channelId, lastFetchedAt ?? 0)
|
||||
}
|
||||
try handlePostsInThread(db, postData.posts)
|
||||
}
|
||||
@@ -276,9 +293,9 @@ extension Database {
|
||||
|
||||
let deleteQuery = postsInChannelTable
|
||||
.where(idCol != id &&
|
||||
channelIdCol == channelId &&
|
||||
earliestCol >= earliest &&
|
||||
latestCol <= latest)
|
||||
channelIdCol == channelId &&
|
||||
earliestCol >= earliest &&
|
||||
latestCol <= latest)
|
||||
.delete()
|
||||
|
||||
try db.run(deleteQuery)
|
||||
@@ -452,11 +469,11 @@ extension Database {
|
||||
for emoji in emojis {
|
||||
if let e = emoji as? [String: Any] {
|
||||
var emojiSetter = [Setter]()
|
||||
emojiSetter.append(id <- e["id"] as! String)
|
||||
emojiSetter.append(name <- e["name"] as! String)
|
||||
emojiSetter.append(id <- e["id"] as! String)
|
||||
emojiSetter.append(name <- e["name"] as! String)
|
||||
emojiSetter.append(statusCol <- "created")
|
||||
|
||||
emojiSetters.append(emojiSetter)
|
||||
emojiSetters.append(emojiSetter)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -481,6 +498,7 @@ extension Database {
|
||||
let replyCount = Expression<Int>("reply_count")
|
||||
let isFollowing = Expression<Bool>("is_following")
|
||||
let statusCol = Expression<String>("_status")
|
||||
let lastFetchAtCol = Expression<Int64>("last_fetched_at")
|
||||
|
||||
var threadsSetters: [ThreadSetters] = []
|
||||
|
||||
@@ -505,6 +523,7 @@ extension Database {
|
||||
setter.append(lastReplyAt <- post.last_reply_at)
|
||||
setter.append(replyCount <- post.reply_count)
|
||||
setter.append(isFollowing <- post.is_following)
|
||||
setter.append(lastFetchAtCol <- 0)
|
||||
setter.append(statusCol <- "created")
|
||||
|
||||
let threadSetter = ThreadSetters(
|
||||
|
||||
4
types/api/threads.d.ts
vendored
4
types/api/threads.d.ts
vendored
@@ -14,6 +14,10 @@ type Thread = {
|
||||
delete_at: number;
|
||||
};
|
||||
|
||||
type ThreadWithLastFetchedAt = Thread & {
|
||||
lastFetchedAt: number;
|
||||
}
|
||||
|
||||
type ThreadParticipant = {
|
||||
id: $ID<User>;
|
||||
thread_id: $ID<Thread>;
|
||||
|
||||
2
types/database/database.d.ts
vendored
2
types/database/database.d.ts
vendored
@@ -89,7 +89,7 @@ export type HandlePostsArgs = {
|
||||
};
|
||||
|
||||
export type HandleThreadsArgs = {
|
||||
threads?: Thread[];
|
||||
threads?: ThreadWithLastFetchedAt[];
|
||||
prepareRecordsOnly?: boolean;
|
||||
teamId?: string;
|
||||
loadedInGlobalThreads?: boolean;
|
||||
|
||||
@@ -17,6 +17,9 @@ export default class MyChannelModel extends Model {
|
||||
/** last_post_at : The timestamp for any last post on this channel */
|
||||
lastPostAt: number;
|
||||
|
||||
/** last_fetched_at : The timestamp when we successfully last fetched post on this channel */
|
||||
lastFetchedAt: number;
|
||||
|
||||
/** last_viewed_at : The timestamp showing the user's last viewed post on this channel */
|
||||
lastViewedAt: number;
|
||||
|
||||
@@ -43,4 +46,6 @@ export default class MyChannelModel extends Model {
|
||||
|
||||
/** settings: User specific settings/preferences for this channel */
|
||||
settings: Relation<MyChannelSettingsModel>;
|
||||
|
||||
resetPreparedState: () => void;
|
||||
}
|
||||
|
||||
3
types/database/models/servers/thread.d.ts
vendored
3
types/database/models/servers/thread.d.ts
vendored
@@ -20,6 +20,9 @@ export default class ThreadModel extends Model {
|
||||
/** lastReplyAt : The timestamp of when user last replied to the thread. */
|
||||
lastReplyAt: number;
|
||||
|
||||
/** last_last_fetched_at_at : The timestamp when we successfully last fetched post on this channel */
|
||||
lastFetchedAt: number;
|
||||
|
||||
/** lastViewedAt : The timestamp of when user last viewed the thread. */
|
||||
lastViewedAt: number;
|
||||
|
||||
|
||||
2
types/database/raw_values.d.ts
vendored
2
types/database/raw_values.d.ts
vendored
@@ -116,7 +116,7 @@ type RawValue =
|
||||
| TeamChannelHistory
|
||||
| TeamMembership
|
||||
| TeamSearchHistory
|
||||
| Thread
|
||||
| ThreadWithLastFetchedAt
|
||||
| ThreadInTeam
|
||||
| ThreadParticipant
|
||||
| UserProfile
|
||||
|
||||
Reference in New Issue
Block a user