mas_storage_pg/
app_session.rs

1// Copyright 2024, 2025 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
5// Please see LICENSE files in the repository root for full details.
6
7//! A module containing PostgreSQL implementation of repositories for sessions
8
9use async_trait::async_trait;
10use mas_data_model::{
11    Clock, CompatSession, CompatSessionState, Device, Session, SessionState, User,
12};
13use mas_storage::{
14    Page, Pagination,
15    app_session::{AppSession, AppSessionFilter, AppSessionRepository, AppSessionState},
16    compat::CompatSessionFilter,
17    oauth2::OAuth2SessionFilter,
18};
19use oauth2_types::scope::{Scope, ScopeToken};
20use opentelemetry_semantic_conventions::trace::DB_QUERY_TEXT;
21use sea_query::{
22    Alias, ColumnRef, CommonTableExpression, Expr, PostgresQueryBuilder, Query, UnionType,
23};
24use sea_query_binder::SqlxBinder;
25use sqlx::PgConnection;
26use tracing::Instrument;
27use ulid::Ulid;
28use uuid::Uuid;
29
30use crate::{
31    DatabaseError, ExecuteExt,
32    errors::DatabaseInconsistencyError,
33    filter::StatementExt,
34    iden::{CompatSessions, OAuth2Sessions},
35    pagination::QueryBuilderExt,
36};
37
38/// An implementation of [`AppSessionRepository`] for a PostgreSQL connection
39pub struct PgAppSessionRepository<'c> {
40    conn: &'c mut PgConnection,
41}
42
43impl<'c> PgAppSessionRepository<'c> {
44    /// Create a new [`PgAppSessionRepository`] from an active PostgreSQL
45    /// connection
46    pub fn new(conn: &'c mut PgConnection) -> Self {
47        Self { conn }
48    }
49}
50
51mod priv_ {
52    // The enum_def macro generates a public enum, which we don't want, because it
53    // triggers the missing docs warning
54
55    use std::net::IpAddr;
56
57    use chrono::{DateTime, Utc};
58    use mas_storage::pagination::Node;
59    use sea_query::enum_def;
60    use ulid::Ulid;
61    use uuid::Uuid;
62
63    #[derive(sqlx::FromRow)]
64    #[enum_def]
65    pub(super) struct AppSessionLookup {
66        pub(super) cursor: Uuid,
67        pub(super) compat_session_id: Option<Uuid>,
68        pub(super) oauth2_session_id: Option<Uuid>,
69        pub(super) oauth2_client_id: Option<Uuid>,
70        pub(super) user_session_id: Option<Uuid>,
71        pub(super) user_id: Option<Uuid>,
72        pub(super) scope_list: Option<Vec<String>>,
73        pub(super) device_id: Option<String>,
74        pub(super) human_name: Option<String>,
75        pub(super) created_at: DateTime<Utc>,
76        pub(super) finished_at: Option<DateTime<Utc>>,
77        pub(super) is_synapse_admin: Option<bool>,
78        pub(super) user_agent: Option<String>,
79        pub(super) last_active_at: Option<DateTime<Utc>>,
80        pub(super) last_active_ip: Option<IpAddr>,
81    }
82
83    impl Node<Ulid> for AppSessionLookup {
84        fn cursor(&self) -> Ulid {
85            self.cursor.into()
86        }
87    }
88}
89
90use priv_::{AppSessionLookup, AppSessionLookupIden};
91
92impl TryFrom<AppSessionLookup> for AppSession {
93    type Error = DatabaseError;
94
95    fn try_from(value: AppSessionLookup) -> Result<Self, Self::Error> {
96        // This is annoying to do, but we have to match on all the fields to determine
97        // whether it's a compat session or an oauth2 session
98        let AppSessionLookup {
99            cursor,
100            compat_session_id,
101            oauth2_session_id,
102            oauth2_client_id,
103            user_session_id,
104            user_id,
105            scope_list,
106            device_id,
107            human_name,
108            created_at,
109            finished_at,
110            is_synapse_admin,
111            user_agent,
112            last_active_at,
113            last_active_ip,
114        } = value;
115
116        let user_session_id = user_session_id.map(Ulid::from);
117
118        match (
119            compat_session_id,
120            oauth2_session_id,
121            oauth2_client_id,
122            user_id,
123            scope_list,
124            device_id,
125            is_synapse_admin,
126        ) {
127            (
128                Some(compat_session_id),
129                None,
130                None,
131                Some(user_id),
132                None,
133                device_id_opt,
134                Some(is_synapse_admin),
135            ) => {
136                let id = compat_session_id.into();
137                let device = device_id_opt
138                    .map(Device::try_from)
139                    .transpose()
140                    .map_err(|e| {
141                        DatabaseInconsistencyError::on("compat_sessions")
142                            .column("device_id")
143                            .row(id)
144                            .source(e)
145                    })?;
146
147                let state = match finished_at {
148                    None => CompatSessionState::Valid,
149                    Some(finished_at) => CompatSessionState::Finished { finished_at },
150                };
151
152                let session = CompatSession {
153                    id,
154                    state,
155                    user_id: user_id.into(),
156                    device,
157                    human_name,
158                    user_session_id,
159                    created_at,
160                    is_synapse_admin,
161                    user_agent,
162                    last_active_at,
163                    last_active_ip,
164                };
165
166                Ok(AppSession::Compat(Box::new(session)))
167            }
168
169            (
170                None,
171                Some(oauth2_session_id),
172                Some(oauth2_client_id),
173                user_id,
174                Some(scope_list),
175                None,
176                None,
177            ) => {
178                let id = oauth2_session_id.into();
179                let scope: Result<Scope, _> =
180                    scope_list.iter().map(|s| s.parse::<ScopeToken>()).collect();
181                let scope = scope.map_err(|e| {
182                    DatabaseInconsistencyError::on("oauth2_sessions")
183                        .column("scope")
184                        .row(id)
185                        .source(e)
186                })?;
187
188                let state = match value.finished_at {
189                    None => SessionState::Valid,
190                    Some(finished_at) => SessionState::Finished { finished_at },
191                };
192
193                let session = Session {
194                    id,
195                    state,
196                    created_at,
197                    client_id: oauth2_client_id.into(),
198                    user_id: user_id.map(Ulid::from),
199                    user_session_id,
200                    scope,
201                    user_agent,
202                    last_active_at,
203                    last_active_ip,
204                    human_name,
205                };
206
207                Ok(AppSession::OAuth2(Box::new(session)))
208            }
209
210            _ => Err(DatabaseInconsistencyError::on("sessions")
211                .row(cursor.into())
212                .into()),
213        }
214    }
215}
216
217/// Split a [`AppSessionFilter`] into two separate filters: a
218/// [`CompatSessionFilter`] and an [`OAuth2SessionFilter`].
219fn split_filter(
220    filter: AppSessionFilter<'_>,
221) -> (CompatSessionFilter<'_>, OAuth2SessionFilter<'_>) {
222    let mut compat_filter = CompatSessionFilter::new();
223    let mut oauth2_filter = OAuth2SessionFilter::new();
224
225    if let Some(user) = filter.user() {
226        compat_filter = compat_filter.for_user(user);
227        oauth2_filter = oauth2_filter.for_user(user);
228    }
229
230    match filter.state() {
231        Some(AppSessionState::Active) => {
232            compat_filter = compat_filter.active_only();
233            oauth2_filter = oauth2_filter.active_only();
234        }
235        Some(AppSessionState::Finished) => {
236            compat_filter = compat_filter.finished_only();
237            oauth2_filter = oauth2_filter.finished_only();
238        }
239        None => {}
240    }
241
242    if let Some(device) = filter.device() {
243        compat_filter = compat_filter.for_device(device);
244        oauth2_filter = oauth2_filter.for_device(device);
245    }
246
247    if let Some(browser_session) = filter.browser_session() {
248        compat_filter = compat_filter.for_browser_session(browser_session);
249        oauth2_filter = oauth2_filter.for_browser_session(browser_session);
250    }
251
252    if let Some(last_active_before) = filter.last_active_before() {
253        compat_filter = compat_filter.with_last_active_before(last_active_before);
254        oauth2_filter = oauth2_filter.with_last_active_before(last_active_before);
255    }
256
257    if let Some(last_active_after) = filter.last_active_after() {
258        compat_filter = compat_filter.with_last_active_after(last_active_after);
259        oauth2_filter = oauth2_filter.with_last_active_after(last_active_after);
260    }
261
262    (compat_filter, oauth2_filter)
263}
264
265#[async_trait]
266impl AppSessionRepository for PgAppSessionRepository<'_> {
267    type Error = DatabaseError;
268
269    #[tracing::instrument(
270        name = "db.app_session.list",
271        fields(
272            db.query.text,
273        ),
274        skip_all,
275        err,
276    )]
277    async fn list(
278        &mut self,
279        filter: AppSessionFilter<'_>,
280        pagination: Pagination,
281    ) -> Result<Page<AppSession>, Self::Error> {
282        let (compat_filter, oauth2_filter) = split_filter(filter);
283
284        let mut oauth2_session_select = Query::select()
285            .expr_as(
286                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
287                AppSessionLookupIden::Cursor,
288            )
289            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::CompatSessionId)
290            .expr_as(
291                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
292                AppSessionLookupIden::Oauth2SessionId,
293            )
294            .expr_as(
295                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)),
296                AppSessionLookupIden::Oauth2ClientId,
297            )
298            .expr_as(
299                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserSessionId)),
300                AppSessionLookupIden::UserSessionId,
301            )
302            .expr_as(
303                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)),
304                AppSessionLookupIden::UserId,
305            )
306            .expr_as(
307                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::ScopeList)),
308                AppSessionLookupIden::ScopeList,
309            )
310            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::DeviceId)
311            .expr_as(
312                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::HumanName)),
313                AppSessionLookupIden::HumanName,
314            )
315            .expr_as(
316                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::CreatedAt)),
317                AppSessionLookupIden::CreatedAt,
318            )
319            .expr_as(
320                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::FinishedAt)),
321                AppSessionLookupIden::FinishedAt,
322            )
323            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::IsSynapseAdmin)
324            .expr_as(
325                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserAgent)),
326                AppSessionLookupIden::UserAgent,
327            )
328            .expr_as(
329                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveAt)),
330                AppSessionLookupIden::LastActiveAt,
331            )
332            .expr_as(
333                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveIp)),
334                AppSessionLookupIden::LastActiveIp,
335            )
336            .from(OAuth2Sessions::Table)
337            .apply_filter(oauth2_filter)
338            .clone();
339
340        let compat_session_select = Query::select()
341            .expr_as(
342                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
343                AppSessionLookupIden::Cursor,
344            )
345            .expr_as(
346                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
347                AppSessionLookupIden::CompatSessionId,
348            )
349            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2SessionId)
350            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2ClientId)
351            .expr_as(
352                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)),
353                AppSessionLookupIden::UserSessionId,
354            )
355            .expr_as(
356                Expr::col((CompatSessions::Table, CompatSessions::UserId)),
357                AppSessionLookupIden::UserId,
358            )
359            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::ScopeList)
360            .expr_as(
361                Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
362                AppSessionLookupIden::DeviceId,
363            )
364            .expr_as(
365                Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
366                AppSessionLookupIden::HumanName,
367            )
368            .expr_as(
369                Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
370                AppSessionLookupIden::CreatedAt,
371            )
372            .expr_as(
373                Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)),
374                AppSessionLookupIden::FinishedAt,
375            )
376            .expr_as(
377                Expr::col((CompatSessions::Table, CompatSessions::IsSynapseAdmin)),
378                AppSessionLookupIden::IsSynapseAdmin,
379            )
380            .expr_as(
381                Expr::col((CompatSessions::Table, CompatSessions::UserAgent)),
382                AppSessionLookupIden::UserAgent,
383            )
384            .expr_as(
385                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt)),
386                AppSessionLookupIden::LastActiveAt,
387            )
388            .expr_as(
389                Expr::col((CompatSessions::Table, CompatSessions::LastActiveIp)),
390                AppSessionLookupIden::LastActiveIp,
391            )
392            .from(CompatSessions::Table)
393            .apply_filter(compat_filter)
394            .clone();
395
396        let common_table_expression = CommonTableExpression::new()
397            .query(
398                oauth2_session_select
399                    .union(UnionType::All, compat_session_select)
400                    .clone(),
401            )
402            .table_name(Alias::new("sessions"))
403            .clone();
404
405        let with_clause = Query::with().cte(common_table_expression).clone();
406
407        let select = Query::select()
408            .column(ColumnRef::Asterisk)
409            .from(Alias::new("sessions"))
410            .generate_pagination(AppSessionLookupIden::Cursor, pagination)
411            .clone();
412
413        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
414
415        let edges: Vec<AppSessionLookup> = sqlx::query_as_with(&sql, arguments)
416            .traced()
417            .fetch_all(&mut *self.conn)
418            .await?;
419
420        let page = pagination.process(edges).try_map(TryFrom::try_from)?;
421
422        Ok(page)
423    }
424
425    #[tracing::instrument(
426        name = "db.app_session.count",
427        fields(
428            db.query.text,
429        ),
430        skip_all,
431        err,
432    )]
433    async fn count(&mut self, filter: AppSessionFilter<'_>) -> Result<usize, Self::Error> {
434        let (compat_filter, oauth2_filter) = split_filter(filter);
435        let mut oauth2_session_select = Query::select()
436            .expr(Expr::cust("1"))
437            .from(OAuth2Sessions::Table)
438            .apply_filter(oauth2_filter)
439            .clone();
440
441        let compat_session_select = Query::select()
442            .expr(Expr::cust("1"))
443            .from(CompatSessions::Table)
444            .apply_filter(compat_filter)
445            .clone();
446
447        let common_table_expression = CommonTableExpression::new()
448            .query(
449                oauth2_session_select
450                    .union(UnionType::All, compat_session_select)
451                    .clone(),
452            )
453            .table_name(Alias::new("sessions"))
454            .clone();
455
456        let with_clause = Query::with().cte(common_table_expression).clone();
457
458        let select = Query::select()
459            .expr(Expr::cust("COUNT(*)"))
460            .from(Alias::new("sessions"))
461            .clone();
462
463        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
464
465        let count: i64 = sqlx::query_scalar_with(&sql, arguments)
466            .traced()
467            .fetch_one(&mut *self.conn)
468            .await?;
469
470        count
471            .try_into()
472            .map_err(DatabaseError::to_invalid_operation)
473    }
474
475    #[tracing::instrument(
476        name = "db.app_session.finish_sessions_to_replace_device",
477        fields(
478            db.query.text,
479            %user.id,
480            %device_id = device.as_str()
481        ),
482        skip_all,
483        err,
484    )]
485    async fn finish_sessions_to_replace_device(
486        &mut self,
487        clock: &dyn Clock,
488        user: &User,
489        device: &Device,
490    ) -> Result<(), Self::Error> {
491        // TODO need to invoke this from all the oauth2 login sites
492        let span = tracing::info_span!(
493            "db.app_session.finish_sessions_to_replace_device.compat_sessions",
494            { DB_QUERY_TEXT } = tracing::field::Empty,
495        );
496        let finished_at = clock.now();
497        sqlx::query!(
498            "
499                UPDATE compat_sessions SET finished_at = $3 WHERE user_id = $1 AND device_id = $2 AND finished_at IS NULL
500            ",
501            Uuid::from(user.id),
502            device.as_str(),
503            finished_at
504        )
505        .record(&span)
506        .execute(&mut *self.conn)
507        .instrument(span)
508        .await?;
509
510        if let Ok([stable_device_as_scope_token, unstable_device_as_scope_token]) =
511            device.to_scope_token()
512        {
513            let span = tracing::info_span!(
514                "db.app_session.finish_sessions_to_replace_device.oauth2_sessions",
515                { DB_QUERY_TEXT } = tracing::field::Empty,
516            );
517            sqlx::query!(
518                "
519                    UPDATE oauth2_sessions
520                    SET finished_at = $4
521                    WHERE user_id = $1
522                      AND ($2 = ANY(scope_list) OR $3 = ANY(scope_list))
523                      AND finished_at IS NULL
524                ",
525                Uuid::from(user.id),
526                stable_device_as_scope_token.as_str(),
527                unstable_device_as_scope_token.as_str(),
528                finished_at
529            )
530            .record(&span)
531            .execute(&mut *self.conn)
532            .instrument(span)
533            .await?;
534        }
535
536        Ok(())
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use chrono::Duration;
543    use mas_data_model::{Device, clock::MockClock};
544    use mas_storage::{
545        Pagination, RepositoryAccess,
546        app_session::{AppSession, AppSessionFilter},
547        oauth2::OAuth2SessionRepository,
548    };
549    use oauth2_types::{
550        requests::GrantType,
551        scope::{OPENID, Scope},
552    };
553    use rand::SeedableRng;
554    use rand_chacha::ChaChaRng;
555    use sqlx::PgPool;
556
557    use crate::PgRepository;
558
559    #[sqlx::test(migrator = "crate::MIGRATOR")]
560    async fn test_app_repo(pool: PgPool) {
561        let mut rng = ChaChaRng::seed_from_u64(42);
562        let clock = MockClock::default();
563        let mut repo = PgRepository::from_pool(&pool).await.unwrap();
564
565        // Create a user
566        let user = repo
567            .user()
568            .add(&mut rng, &clock, "john".to_owned())
569            .await
570            .unwrap();
571
572        let all = AppSessionFilter::new().for_user(&user);
573        let active = all.active_only();
574        let finished = all.finished_only();
575        let pagination = Pagination::first(10);
576
577        assert_eq!(repo.app_session().count(all).await.unwrap(), 0);
578        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
579        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
580
581        let full_list = repo.app_session().list(all, pagination).await.unwrap();
582        assert!(full_list.edges.is_empty());
583        let active_list = repo.app_session().list(active, pagination).await.unwrap();
584        assert!(active_list.edges.is_empty());
585        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
586        assert!(finished_list.edges.is_empty());
587
588        // Start a compat session for that user
589        let device = Device::generate(&mut rng);
590        let compat_session = repo
591            .compat_session()
592            .add(&mut rng, &clock, &user, device.clone(), None, false, None)
593            .await
594            .unwrap();
595
596        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
597        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
598        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
599
600        let full_list = repo.app_session().list(all, pagination).await.unwrap();
601        assert_eq!(full_list.edges.len(), 1);
602        assert_eq!(
603            full_list.edges[0].node,
604            AppSession::Compat(Box::new(compat_session.clone()))
605        );
606        let active_list = repo.app_session().list(active, pagination).await.unwrap();
607        assert_eq!(active_list.edges.len(), 1);
608        assert_eq!(
609            active_list.edges[0].node,
610            AppSession::Compat(Box::new(compat_session.clone()))
611        );
612        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
613        assert!(finished_list.edges.is_empty());
614
615        // Finish the session
616        let compat_session = repo
617            .compat_session()
618            .finish(&clock, compat_session)
619            .await
620            .unwrap();
621
622        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
623        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
624        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
625
626        let full_list = repo.app_session().list(all, pagination).await.unwrap();
627        assert_eq!(full_list.edges.len(), 1);
628        assert_eq!(
629            full_list.edges[0].node,
630            AppSession::Compat(Box::new(compat_session.clone()))
631        );
632        let active_list = repo.app_session().list(active, pagination).await.unwrap();
633        assert!(active_list.edges.is_empty());
634        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
635        assert_eq!(finished_list.edges.len(), 1);
636        assert_eq!(
637            finished_list.edges[0].node,
638            AppSession::Compat(Box::new(compat_session.clone()))
639        );
640
641        // Start an OAuth2 session
642        let client = repo
643            .oauth2_client()
644            .add(
645                &mut rng,
646                &clock,
647                vec!["https://example.com/redirect".parse().unwrap()],
648                None,
649                None,
650                None,
651                vec![GrantType::AuthorizationCode],
652                Some("First client".to_owned()),
653                Some("https://example.com/logo.png".parse().unwrap()),
654                Some("https://example.com/".parse().unwrap()),
655                Some("https://example.com/policy".parse().unwrap()),
656                Some("https://example.com/tos".parse().unwrap()),
657                Some("https://example.com/jwks.json".parse().unwrap()),
658                None,
659                None,
660                None,
661                None,
662                None,
663                Some("https://example.com/login".parse().unwrap()),
664            )
665            .await
666            .unwrap();
667
668        let device2 = Device::generate(&mut rng);
669        let scope: Scope = [OPENID]
670            .into_iter()
671            .chain(device2.to_scope_token().unwrap().into_iter())
672            .collect();
673
674        // We're moving the clock forward by 1 minute between each session to ensure
675        // we're getting consistent ordering in lists.
676        clock.advance(Duration::try_minutes(1).unwrap());
677
678        let oauth_session = repo
679            .oauth2_session()
680            .add(&mut rng, &clock, &client, Some(&user), None, scope)
681            .await
682            .unwrap();
683
684        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
685        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
686        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
687
688        let full_list = repo.app_session().list(all, pagination).await.unwrap();
689        assert_eq!(full_list.edges.len(), 2);
690        assert_eq!(
691            full_list.edges[0].node,
692            AppSession::Compat(Box::new(compat_session.clone()))
693        );
694        assert_eq!(
695            full_list.edges[1].node,
696            AppSession::OAuth2(Box::new(oauth_session.clone()))
697        );
698
699        let active_list = repo.app_session().list(active, pagination).await.unwrap();
700        assert_eq!(active_list.edges.len(), 1);
701        assert_eq!(
702            active_list.edges[0].node,
703            AppSession::OAuth2(Box::new(oauth_session.clone()))
704        );
705
706        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
707        assert_eq!(finished_list.edges.len(), 1);
708        assert_eq!(
709            finished_list.edges[0].node,
710            AppSession::Compat(Box::new(compat_session.clone()))
711        );
712
713        // Finish the session
714        let oauth_session = repo
715            .oauth2_session()
716            .finish(&clock, oauth_session)
717            .await
718            .unwrap();
719
720        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
721        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
722        assert_eq!(repo.app_session().count(finished).await.unwrap(), 2);
723
724        let full_list = repo.app_session().list(all, pagination).await.unwrap();
725        assert_eq!(full_list.edges.len(), 2);
726        assert_eq!(
727            full_list.edges[0].node,
728            AppSession::Compat(Box::new(compat_session.clone()))
729        );
730        assert_eq!(
731            full_list.edges[1].node,
732            AppSession::OAuth2(Box::new(oauth_session.clone()))
733        );
734
735        let active_list = repo.app_session().list(active, pagination).await.unwrap();
736        assert!(active_list.edges.is_empty());
737
738        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
739        assert_eq!(finished_list.edges.len(), 2);
740        assert_eq!(
741            finished_list.edges[0].node,
742            AppSession::Compat(Box::new(compat_session.clone()))
743        );
744        assert_eq!(
745            full_list.edges[1].node,
746            AppSession::OAuth2(Box::new(oauth_session.clone()))
747        );
748
749        // Query by device
750        let filter = AppSessionFilter::new().for_device(&device);
751        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
752        let list = repo.app_session().list(filter, pagination).await.unwrap();
753        assert_eq!(list.edges.len(), 1);
754        assert_eq!(
755            list.edges[0].node,
756            AppSession::Compat(Box::new(compat_session.clone()))
757        );
758
759        let filter = AppSessionFilter::new().for_device(&device2);
760        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
761        let list = repo.app_session().list(filter, pagination).await.unwrap();
762        assert_eq!(list.edges.len(), 1);
763        assert_eq!(
764            list.edges[0].node,
765            AppSession::OAuth2(Box::new(oauth_session.clone()))
766        );
767
768        // Create a second user
769        let user2 = repo
770            .user()
771            .add(&mut rng, &clock, "alice".to_owned())
772            .await
773            .unwrap();
774
775        // If we list/count for this user, we should get nothing
776        let filter = AppSessionFilter::new().for_user(&user2);
777        assert_eq!(repo.app_session().count(filter).await.unwrap(), 0);
778        let list = repo.app_session().list(filter, pagination).await.unwrap();
779        assert!(list.edges.is_empty());
780    }
781}