# TanaT - Complete Documentation TanaT (Temporal ANalysis of Trajectories) is a Python library for temporal sequence analysis, focused on patient care pathways. It supports multi-sequence trajectories combining events, intervals, and states. ============================================================ ## Core Concepts Core Concepts This page introduces the fundamental concepts of TanaT's data model. Understanding these concepts is essential for using the library effectively. Entities, Sequences, and Trajectories TanaT distinguishes three levels of temporal data structures: .. list-table:: :header-rows: 1 :widths: 20 40 40 * - Level - Description - Example * - **Entity** - A single observation with temporal extent - A medical visit, a hospitalization * - **Sequence** - Collection of entities for one individual - All visits of patient P001 * - **Trajectory** - Multiple sequences for one individual - Visits + hospitalizations + lab results for P001 Entity An **entity** is the atomic unit of temporal data. It has: - **Features**: One or more descriptive attributes (categorical or numerical) - **Temporal extent**: Either a single timestamp or a time interval Sequence A **sequence** is a collection of entities for a single individual. All entities in a sequence share the same type (events, intervals, or states) and the same feature structure. The diagram below shows a sequence with 4 event entities. Note that two events can share the same timestamp (Event A and Event B on Nov 8). Trajectory A **trajectory** combines multiple sequences of different types for the same individual. It can also include **static features** (attributes not tied to time, like birth date or gender). The diagram below shows a trajectory with three sequence types: Sequence Types TanaT supports three types of temporal extent: .. list-table:: :header-rows: 1 :widths: 20 35 45 * - Type - Temporal Extent - Constraints * - **Event** - Single timestamp (punctual) - None * - **Interval** - Start and end dates - Can overlap, gaps allowed * - **State** - Start and end dates - Contiguous, no overlap, no gaps **When to use each type:** - **Event**: Point-in-time occurrences (visits, purchases, clicks) - **Interval**: Duration-based events that can overlap (treatments, projects) - **State**: Continuous states without gaps (disease stages, employment status) Pools A **pool** is a collection of sequences or trajectories from multiple individuals. All items in a pool share the same structure (same features, same temporal type). from tanat.sequence import EventSequencePool # Create a pool from a DataFrame pool = EventSequencePool(data, settings={ "id_column": "patient_id", "time_column": "visit_date", "entity_features": ["visit_type"] }) # Access individual sequences patient_001 = pool["P001"] # Iterate over all sequences for sequence in pool: print(sequence.id, len(sequence)) Pools are the primary data structure for analysis operations like computing distance matrices or clustering. Settings TanaT uses **settings objects** to configure pools, metrics, and other components. This pattern provides: - Clear separation of configuration from data - Type validation and defaults - Reproducibility (settings can be exported/imported) from tanat.sequence import EventSequencePool, EventSequenceSettings # Explicit settings object settings = EventSequenceSettings( id_column="patient_id", time_column="visit_date", entity_features=["visit_type"] ) pool = EventSequencePool(data, settings) Most TanaT classes have a corresponding settings class (e.g., `DTWSequenceMetric` → `DTWSequenceMetricSettings`). Updating Settings Safely **Always use** `update_settings()` to modify configuration after initialization. Direct attribute modification can break internal caching and validation. # CORRECT: Use update_settings metric.update_settings(window=20) metric.update_settings(normalize=True) # WRONG: Direct modification bypasses validation metric.settings.window = 20 # Don't do this! This method is available on all TanaT objects with settings (metrics, clusterers, etc.). It ensures proper validation and cache invalidation. ---------------------------------------- ## First Steps First Steps This guide walks you through the core TanaT workflow: loading data, choosing the right sequence type, and exploring your temporal data. .. note:: Make sure TanaT is installed: `pip install tanat` (see ). 1. Prepare Your Data TanaT works with pandas DataFrames containing temporal data: import pandas as pd # Sample data: patient visits data = pd.DataFrame({ 'patient_id': ['P001', 'P001', 'P001', 'P002', 'P002'], 'visit_date': pd.to_datetime([ '2023-01-15', '2023-02-20', '2023-03-10', '2023-01-20', '2023-03-15' ]), 'visit_type': ['GP', 'SPECIALIST', 'GP', 'GP', 'EMERGENCY'] }) 2. Choose the Right Sequence Type Before creating a pool, identify which sequence type matches your data: .. list-table:: :header-rows: 1 :widths: 20 40 40 * - Type - Your data has... - Example * - **EventSequence** - Single timestamps (punctual events) - Medical visits, purchases, clicks * - **IntervalSequence** - Start + end dates (can overlap) - Treatments, hospital stays, projects * - **StateSequence** - Contiguous states (no gaps, no overlap) - Disease stages, employment status For our example, visits are **punctual events** so we use `EventSequencePool`. 3. Create a Sequence Pool A **pool** groups sequences from multiple individuals: from tanat.sequence import EventSequencePool pool = EventSequencePool(data, settings={ "id_column": "patient_id", "time_column": "visit_date", "entity_features": ["visit_type"] }) 4. Verify Inferred Metadata When you display the pool, TanaT shows a summary including **automatically inferred metadata**. It's important to verify this inference is correct before proceeding: # Display pool summary with inferred metadata print(pool) ┌──────────────────────────────────────────────────┐ │ EventSequencePool summary │ └──────────────────────────────────────────────────┘ STATISTICS ───────────────────────── Total sequences 2 Average length 2.5 ... Metadata: Temporal: Type: datetime Granularity: DAY Entity Features (1): - visit_type: categorical You can also get a compact metadata view: print(pool.metadata.describe()) If the inference is incorrect, you can update the metadata: # Example: correct the timezone pool.update_temporal_metadata(timezone="Europe/Paris") # Example: specify ordered categories pool.update_entity_metadata( feature_name="visit_type", categories=["GP", "SPECIALIST", "EMERGENCY"], ordered=True ) 5. Access Individual Sequences # Get a specific patient's sequence patient = pool['P001'] print(f"Patient P001: {len(patient)} visits") # View the underlying data print(patient.sequence_data) 6. Access Individual Entities Within a sequence, you can access individual entities (observations): # Get the first entity (visit) in the sequence first_visit = patient[0] # Access entity properties print(f"Temporal extent: {first_visit.extent}") # 2023-01-15 00:00:00 print(f"Value: {first_visit.value}") # GP # Iterate over all entities for entity in patient: print(f"{entity.extent}: {entity.value}") ---------------------------------------- ## Installation Installation Using PyPI Using `pip` should also work fine: python -m pip install tanat Using latest github-hosted version If you want to get *TanaT*'s latest version, you can refer to the official repository hosted at the Inria gitlab: python -m pip -e install https://gitlab.inria.fr/tanat/core/tanat Dependencies *TanaT* relies on several foundational libraries from the scientific Python ecosystem, including: - `pandas` for tabular data handling - `numpy` and `scipy` for numerical and scientific computing - `matplotlib` for basic visualization - `scikit-learn` for machine learning utilities - `numba` for performance optimization through JIT compilation In addition, *TanaT* makes use of: - `scikit-survival` for survival analysis - `sqlalchemy` for SQL-based data access - `tqdm` for progress tracking in processing pipelines - `PyYAML` for configuration handling - `pypassist`, `tseqmock`, and `tanat_cli_preset` as internal or companion tools for simulation, CLI, and mocking ---------------------------------------- ## Alignment Temporal Alignment Reference documentation for TanaT's temporal alignment system (T0 management and transformations). Overview Temporal alignment enables synchronization of sequences by defining a common reference point (T0) for each sequence. This is essential for: * Comparative cohort analysis * Event-aligned studies (e.g., all patients aligned to first hospitalization) * Longitudinal pattern detection * Time-to-event analysis All sequences start with **absolute time** (datetime or timestep). After setting T0, you can transform to **relative time** or **relative rank**. Setting Reference Dates (T0) TanaT provides multiple methods to set reference dates based on different alignment strategies. .. topic:: zero_from_position() Set T0 based on entity position within each sequence. .. Note:: This is the default alignment method when no other zeroing is applied (position = 0). pool.zero_from_position(position: int = 0) -> self **Attributes:** - `position` *(int)*: Zero-indexed position (default: 0 = first entity) **Example:** # Align to first entity (default behavior) pool.zero_from_position(position=0) # Align to third entity pool.zero_from_position(position=2) # Align to last entity (use negative indexing) pool.zero_from_position(position=-1) **Use cases:** - First entity alignment: All sequences start at T0 = 0 - Fixed position analysis: Compare sequences from Nth event - Last entity alignment: Retrospective analysis from final event .. topic:: Direct T0 Assignment Manually set reference dates using the `t_zero` property. pool.t_zero = {sequence_id: datetime, ...} **Example:** from datetime import datetime # Set custom T0 for specific sequences pool.t_zero = { "patient_001": datetime(2024, 1, 15), "patient_002": datetime(2024, 2, 10), "patient_003": datetime(2024, 1, 20) } **Use cases:** - External reference dates (e.g., birth date, diagnosis date from another dataset) - Study enrollment dates - Custom milestone dates .. topic:: zero_from_query() Set T0 based on the occurrence of specific entities matching a query. pool.zero_from_query( query: str, use_first: bool = True, anchor: str = "start/middle/end", ) -> self **Attributes:** - `query` *(str)*: Pandas-style query string to identify reference entities - `use_first` *(bool)*: If True, use first matching entity (default: True). If False, use last matching entity. - `anchor` *(str)*: Reference point within periods for time calculation. Options: "start", "middle", "end". **Example:** # Align to first emergency visit pool.zero_from_query( query="visit_type == 'EMERGENCY'", use_first=True ) # Align to last treatment event pool.zero_from_query( query="status == 'TREATMENT'", use_last=True ) # Complex query with multiple conditions pool.zero_from_query( query="age > 65 and diagnosis == 'DIABETES'" ) .. note:: Sequences without matching entities will have `None` as T0. Temporal Transformations After setting T0, transform absolute time to relative representations. .. topic:: to_relative_time() Convert timestamps to relative time from T0. pool.to_relative_time( granularity: str = "day", drop_na: bool = False ) -> pd.DataFrame **Attributes:** - `granularity` *(str)*: Time unit for relative time - Datetime temporal: `"year"`, `"month"`, `"week"`, `"day"`, `"hour"`, `"minute"`, `"second"` - Timestep temporal: `"unit"` (raw timestep difference) - `drop_na` *(bool)*: If True, remove entities without valid T0 **Example:** # Convert to days from T0 - returns a DataFrame df_relative = pool.to_relative_time(granularity="day") print(df_relative) # start visit_type diagnosis # sequence_id # patient_001 -4.0 ROUTINE A # patient_001 0.0 EMERGENCY B # patient_001 5.0 FOLLOWUP A # Convert to hours, excluding sequences without T0 df_hours = pool.to_relative_time(granularity="hour", drop_na=True) **Resulting time values:** - Negative values: Events before T0 - Zero: Events at T0 - Positive values: Events after T0 .. topic:: to_relative_rank() Convert to ordinal positions relative to T0. pool.to_relative_rank(drop_na: bool = False) -> pd.DataFrame **Attributes:** - `drop_na` *(bool)*: If True, remove entities without valid T0 **Example:** # Convert to relative ranks - returns a DataFrame df_ranks = pool.to_relative_rank() print(df_ranks) # start visit_type diagnosis # sequence_id # patient_001 -1 ROUTINE A # patient_001 0 EMERGENCY B # patient_001 1 FOLLOWUP A # With missing T0 handling df_ranks = pool.to_relative_rank(drop_na=True) **Resulting rank values:** - Negative ranks: Entities before T0 (-1 = immediately before) - Zero: Entity at T0 - Positive ranks: Entities after T0 (+1 = immediately after) **Use cases:** - Sequential pattern analysis regardless of time intervals - Comparing sequences with different temporal scales - Order-based analysis (1st event after T0, 2nd event after T0, etc.) Workflow Examples .. topic:: Complete Alignment Workflow Typical workflow for temporal alignment and analysis. from tanat.sequence import EventSequencePool from tanat.criterion import TimeCriterion from tanat.visualization.sequence import SequenceVisualizer # 1. Set reference dates (T0 = first EMERGENCY visit) pool.zero_from_query( query="visit_type == 'EMERGENCY'", use_first=True ) # 2. Transform to relative time (returns DataFrame) relative_data = pool.to_relative_time(granularity="day") # 3. Create new aligned pool from relative data aligned_pool = EventSequencePool( relative_data, settings={ "id_column": "sequence_id", "time_column": "start", "entity_features": ["visit_type", "diagnosis"] } ) # 4. Filter time window around T0 (on aligned pool) analysis_window = TimeCriterion( start_after=-30, # 30 days before T0 end_before=90 # 90 days after T0 ) filtered_pool = aligned_pool.filter(analysis_window, level="entity") # 5. Visualize aligned sequences SequenceVisualizer.timeline().draw(filtered_pool).show() Accessing T0 Information .. topic:: Inspect T0 Values **Check T0 values:** # View T0 dictionary print(pool.t_zero) # Output: {'seq-001': Timestamp(...), 'seq-002': None, ...} **Convert to DataFrame:** import pandas as pd t0_df = pd.DataFrame.from_dict( pool.t_zero, orient="index", columns=["T0"] ) t0_df.describe() Zeroing Configuration .. topic:: Available Zeroing Strategies Advanced configuration using the `zeroing` module (typically for internal use or custom implementations). The `tanat.zeroing` module provides three main strategies: - **QueryZeroingSetter**: Entity query-based (used by `zero_from_query()`) - **PositionZeroingSetter**: Position-based (used by `zero_from_position()`) - **DirectZeroingSetter**: Manual assignment (used by `t_zero` property) For most use cases, use the pool methods directly rather than instantiating setters manually. ---------------------------------------- ## Criterion Data Filtering Criterion Reference documentation for TanaT's criterion system for filtering and selecting temporal data. Overview Criterion provide a flexible and composable system for filtering sequences, trajectories, and their entities. They enable: * **Cohort selection**: Extract patient subgroups based on clinical criteria * **Data cleaning**: Remove invalid or incomplete records * **Pattern detection**: Find specific temporal patterns * **Window extraction**: Select time-bounded data segments All criterion support **method chaining** with filtering levels (entity/sequence/trajectory). Filtering Levels TanaT supports three hierarchical filtering levels: **Entity-level** Filters individual records (events, states, intervals) within sequences. Preserves sequence structure but only includes matching entities. **Sequence-level** Filters entire sequences based on whether they contain matching entities. Maintains complete sequence context (all entities kept or none). **Trajectory-level** Filters trajectories based on whether they match the specified criteria. Available only for TrajectoryPool operations. .. note:: - Not all criterion support all filtering levels. See compatibility table below. - Entity-level filters are not allowed for StateSequence (will break the continuous nature of states). Criterion Types .. topic:: QueryCriterion Pandas-style query filtering on entity attributes. from tanat.criterion import QueryCriterion **Attributes:** - `query` *(str)*: Pandas query expression (uses `DataFrame.query()` syntax). **Filtering Levels:** Entity ✓ | Sequence ✓ | Trajectory (via sequence) **Examples:** # Simple equality criterion = QueryCriterion(query="visit_type == 'EMERGENCY'") # Numeric comparison criterion = QueryCriterion(query="age > 65") # Multiple conditions criterion = QueryCriterion(query="age > 65 and chronic_condition == True") # Using 'in' operator criterion = QueryCriterion(query="visit_type in ['SPECIALIST', 'EMERGENCY']") .. topic:: PatternCriterion Sequential pattern matching on entity values. from tanat.criterion import PatternCriterion **Attributes:** - `pattern` *(Dict[str, str | List[str]])*: Feature names to values or sequences. - `contains` *(bool, default: False)*: If True, pattern can occur anywhere. - `case_sensitive` *(bool, default: True)*: If False, ignore case in matching. - `operator` *(str, default: "and")*: Combine multiple patterns ("and" or "or"). **Filtering Levels:** Entity ✓ | Sequence ✓ | Trajectory (via sequence) **Examples:** # Sequential pattern (ordered) criterion = PatternCriterion( pattern={"health_state": ["SICK", "TREATMENT", "RECOVERY"]}, contains=True ) filtered = pool.filter(criterion, level="sequence") # Regex pattern criterion = PatternCriterion( pattern={"visit_type": ["regex:^S", "LABORATORY"]}, contains=True ) .. topic:: TimeCriterion Time window filtering on temporal boundaries. from tanat.criterion import TimeCriterion **Attributes:** - `start_after` *(datetime | int, default: None)*: Minimum start time. - `start_before` *(datetime | int, default: None)*: Maximum start time. - `end_after` *(datetime | int, default: None)*: Minimum end time. - `end_before` *(datetime | int, default: None)*: Maximum end time. - `duration_within` *(bool, default: False)*: Entity must be entirely within bounds. - `sequence_within` *(bool, default: False)*: Entire sequence must be within bounds. **Filtering Levels:** Entity ✓ | Sequence ✓ | Trajectory (via sequence) **Examples:** from datetime import datetime, timedelta # Recent time window (last 3 months) recent_start = datetime.now() - timedelta(days=90) criterion = TimeCriterion(start_after=recent_start, end_before=datetime.now()) filtered = pool.filter(criterion, level="entity") # Entire sequence must be within window criterion = TimeCriterion( start_after=datetime(2024, 1, 1), end_before=datetime(2024, 12, 31), sequence_within=True ) .. topic:: LengthCriterion Sequence length filtering based on entity count. from tanat.criterion import LengthCriterion **Attributes:** - `eq` *(int, default: None)*: Equal to length. - `ne` *(int, default: None)*: Not equal to length. - `gt` *(int, default: None)*: Greater than length. - `ge` *(int, default: None)*: Greater than or equal to length. - `lt` *(int, default: None)*: Less than length. - `le` *(int, default: None)*: Less than or equal to length. **Filtering Levels:** Sequence ✓ | Trajectory (via sequence) **Examples:** # Sequences with at least 5 entities criterion = LengthCriterion(ge=5) filtered = pool.filter(criterion) # Sequences with exactly 10 entities criterion = LengthCriterion(eq=10) .. topic:: StaticCriterion Filtering based on static (non-temporal) features. from tanat.criterion import StaticCriterion **Attributes:** - `query` *(str)*: Pandas query expression on static data (same syntax as QueryCriterion). **Filtering Levels:** Sequence ✓ | Trajectory ✓ **Examples:** # Demographic filtering criterion = StaticCriterion(query="age > 65") filtered = pool.filter(criterion) # Multiple static conditions criterion = StaticCriterion(query="age > 65 and chronic_condition == True") Applying Criterion All criterion use the `filter()` method on pools. Basic Filtering # Entity-level filtering filtered_pool = pool.filter(criterion, level="entity") # Sequence-level filtering filtered_pool = pool.filter(criterion, level="sequence") # Default level (typically sequence) filtered_pool = pool.filter(criterion) Identifying Matches Use `which()` to get IDs of matching sequences without filtering. # Get sequence IDs matching criterion matching_ids = pool.which(criterion) # Type: set of sequence IDs print(type(matching_ids)) # # Use for set operations cohort_a = pool.which(criterion_a) cohort_b = pool.which(criterion_b) intersection = cohort_a.intersection(cohort_b) Advanced Filtering Combining Multiple Criterion Use sequential filtering or set operations. **Sequential approach:** # Apply filters in sequence pool_filtered = ( pool .filter(StaticCriterion(query="age > 65")) .filter(QueryCriterion(query="visit_type == 'EMERGENCY'"), level="sequence") .filter(LengthCriterion(gt=5)) ) **Set-based approach:** # Get IDs for each criterion elderly = pool.which(StaticCriterion(query="age > 65")) with_emergency = pool.which( QueryCriterion(query="visit_type == 'EMERGENCY'") ) sufficient_data = pool.which(LengthCriterion(gt=5)) # Combine with set operations final_cohort = elderly.intersection(with_emergency).intersection(sufficient_data) # Create filtered pool filtered_pool = pool.subset(final_cohort) Negation and Exclusion Exclude sequences matching a criterion. # Get all sequence IDs all_ids = set(pool.unique_ids) # Get IDs to exclude to_exclude = pool.which(criterion) # Get complement to_keep = all_ids - to_exclude # Create filtered pool filtered_pool = pool.subset(to_keep) Conditional Filtering Apply different criterion based on conditions. # Different criteria for different risk levels high_risk_pool = pool.filter(StaticCriterion(query="risk_level == 'HIGH'")) low_risk_pool = pool.filter(StaticCriterion(query="risk_level == 'LOW'")) # Apply risk-specific criteria high_risk_filtered = high_risk_pool.filter(LengthCriterion(gt=10)) low_risk_filtered = low_risk_pool.filter(LengthCriterion(gt=5)) ---------------------------------------- ## Manipulation Data Manipulation API This page provides a comprehensive overview of data manipulation methods available across different TanaT objects. .. role:: green .. role:: red .. role:: blue .. contents:: Table of Contents :local: :depth: 2 Overview TanaT provides a rich set of manipulation methods for temporal data analysis. This reference shows which methods are available for each object type. **Legend:** * :green:`✓` : Method available * :red:`✗` : Method not available * :blue:`✓\*` : Accepts optional `sequence_name` parameter (Trajectory/TrajectoryPool) Method Compatibility Matrix Position-based Selection Methods for selecting entities by their position/rank in sequences. .. list-table:: :header-rows: 1 :widths: 35 15 15 18 17 * - Method - Sequence - SequencePool - Trajectory - TrajectoryPool * - `head(n)` - :green:`✓` - :green:`✓` - :blue:`✓\*` - :blue:`✓\*` * - `tail(n)` - :green:`✓` - :green:`✓` - :blue:`✓\*` - :blue:`✓\*` * - `slice(start, end, step)` - :green:`✓` - :green:`✓` - :blue:`✓\*` - :blue:`✓\*` * - `[index]` - :green:`✓` - :red:`✗` - :red:`✗` - :red:`✗` * - `[start:end:step]` - :green:`✓` - :red:`✗` - :red:`✗` - :red:`✗` :blue:`✓\*` Optional `sequence_name` parameter for Trajectory/TrajectoryPool objects. Applies to all sequences if not specified. **Method Descriptions:** `head(n)` Get first `n` entities. Negative values return all except last `|n|` entities. For Trajectory/TrajectoryPool: use `sequence_name` parameter to target specific sequence. `tail(n)` Get last `n` entities. Supports negative values to get all except first `|n|`. For Trajectory/TrajectoryPool: use `sequence_name` parameter to target specific sequence. `slice(start, end, step)` Select entities by position range with optional step for sampling. Supports negative indices. For Trajectory/TrajectoryPool: use `sequence_name` parameter to target specific sequence. `[index]` Python-style single index access. Returns Entity object (Sequence only). `[start:end:step]` Python-style slice notation. Returns new Sequence (Sequence only). **Examples:** # Sequence - Get first 10 entities first_10 = sequence.head(10) # Sequence - Get all except last 2 all_but_last_2 = sequence.head(-2) # SequencePool - Apply to all sequences pool_first_5 = pool.head(5) # Trajectory - Specific sequence traj_first = trajectory.head(10, sequence_name="prescriptions") # Slicing with step every_second = sequence.slice(step=2) positions_10_to_50 = sequence.slice(start=10, end=50) # Python indexing (Sequence only) first_entity = sequence[0] last_entity = sequence[-1] subset = sequence[10:50:2] # start:end:step Filtering & Selection Methods for conditional selection and filtering. .. list-table:: :header-rows: 1 :widths: 35 15 15 18 17 * - Method - Sequence - SequencePool - Trajectory - TrajectoryPool * - `filter(criterion)` - :green:`✓` - :green:`✓` - :blue:`✓\*` - :blue:`✓\*` * - `subset(ids)` - :red:`✗` - :green:`✓` - :red:`✗` - :green:`✓` * - `which(criterion)` - :red:`✗` - :green:`✓` - :red:`✗` - :green:`✓` * - `match(criterion)` - :green:`✓` - :red:`✗` - :green:`✓` - :red:`✗` :blue:`\*` Optional `sequence_name` parameter for Trajectory objects (applies to all sequences if not specified). **Method Descriptions:** `filter(criterion)` Apply filtering criterion at entity or sequence or trajectory level. Use `sequence_name` parameter to specify entity/sequence level filtering from Trajectory/TrajectoryPool. `subset(ids)` Extract subset by sequence IDs. Available for Pool objects only. `which(criterion)` Get IDs of sequences/trajectories matching criterion. Available for pool only. `match(criterion)` Test if sequence/trajectory matches criterion, returns boolean. **Examples:** # Entity-level filtering with query from tanat.criterion.mixin.query.settings import QueryCriterion criterion = QueryCriterion(query="event_type == 'EMERGENCY'") filtered = sequence.filter(criterion) # Pool - filter sequences by length from tanat.criterion.sequence.type.length.settings import LengthCriterion length_criterion = LengthCriterion(gt=10) long_sequences = pool.filter(length_criterion, level="sequence") # Pool - get IDs matching criterion matching_ids = pool.which(length_criterion) # Pool - subset by IDs subset_pool = pool.subset(["seq-1", "seq-3", "seq-5"]) # Trajectory filtering (specific sequence) rank_criterion = {"start": 0, "end": 50} traj_filtered = trajectory.filter( rank_criterion, sequence_name="prescriptions", criterion_type="rank" ) Temporal Alignment Methods for setting temporal reference point (T0). .. list-table:: :header-rows: 1 :widths: 35 15 15 18 17 * - Method - Sequence - SequencePool - Trajectory - TrajectoryPool * - `zero_from_query(query)` - :green:`✓` - :green:`✓` - :blue:`✓\*` - :blue:`✓\*` * - `zero_from_position(pos)` - :green:`✓` - :green:`✓` - :blue:`✓\*` - :blue:`✓\*` :blue:`\*` `sequence_name` parameter behavior differs by method (see descriptions below). **Method Descriptions:** `zero_from_query(query)` Set T0 from query on sequence data. For Trajectory/TrajectoryPool: **requires** `sequence_name` parameter to specify which sequence to query. `zero_from_position(pos)` Set T0 from entity position (0-based indexing). For Trajectory/TrajectoryPool: **optional** `sequence_name` parameter. If `None`, uses position across all sequences. **Examples:** # Sequence - Set T0 from query sequence.zero_from_query("event_type == 'DIAGNOSIS'") # Sequence - Set T0 from position (5th entity) sequence.zero_from_position(4) # 0-based indexing # SequencePool - applies to all sequences pool.zero_from_query("medication == 'INSULIN'") # Trajectory - query REQUIRES sequence_name trajectory.zero_from_query( "event_type == 'ADMISSION'", sequence_name="hospital_events" # Required! ) # Trajectory - position with specific sequence trajectory.zero_from_position(0, sequence_name="prescriptions") # Trajectory - position across ALL sequences (sequence_name=None) trajectory.zero_from_position(10) # Uses 10th entity across all sequences Temporal Transformations Methods for temporal data transformations. .. list-table:: :header-rows: 1 :widths: 35 15 15 18 17 * - Method - Sequence - SequencePool - Trajectory - TrajectoryPool * - `to_relative_time()` - :green:`✓` - :green:`✓` - :red:`✗` - :red:`✗` * - `to_relative_rank()` - :green:`✓` - :green:`✓` - :red:`✗` - :red:`✗` * - `to_time_spent()` - :green:`✓` - :green:`✓` - :red:`✗` - :red:`✗` * - `to_occurrence()` - :green:`✓` - :green:`✓` - :red:`✗` - :red:`✗` **Method Descriptions:** `to_relative_time()` Convert timestamps to time relative to T0. Requires T0 to be set. `to_relative_rank()` Convert positions to ranks relative to T0 entity. `to_time_spent()` Compute time spent in each state/interval. Available for StateSequence and IntervalSequence only. `to_occurrence()` Count occurrences of events/states up to each position. **Examples:** # Convert to relative time (requires T0) sequence.zero_from_position(0) relative_sequence = sequence.to_relative_time() # Convert to relative ranks ranked_sequence = sequence.to_relative_rank() # Time spent in each state (StateSequence) state_sequence.to_time_spent() # Count occurrences event_sequence.to_occurrence() Type Conversion Methods for converting between sequence types. .. list-table:: :header-rows: 1 :widths: 35 15 15 18 17 * - Method - Sequence - SequencePool - Trajectory - TrajectoryPool * - `as_event()` - :green:`✓` - :green:`✓` - :red:`✗` - :red:`✗` * - `as_interval()` - :green:`✓` - :green:`✓` - :red:`✗` - :red:`✗` * - `as_state()` - :green:`✓` - :green:`✓` - :red:`✗` - :red:`✗` **Method Descriptions:** `as_event()` Convert to EventSequence(Pool). `as_interval()` Convert to IntervalSequence(Pool). `as_state()` Convert to StateSequence(Pool). **Examples:** # Convert interval to event (takes start time) event_sequence = interval_sequence.as_event() # Convert event to interval (requires end time strategy) interval_sequence = event_sequence.as_interval() # Convert interval to state state_sequence = interval_sequence.as_state() Feature Engineering Methods for adding or removing features. .. list-table:: :header-rows: 1 :widths: 35 15 15 18 17 * - Method - Sequence - SequencePool - Trajectory - TrajectoryPool * - `add_entity_feature()` - :green:`✓` - :green:`✓` - :red:`✗` - :red:`✗` * - `drop_entity_feature()` - :green:`✓` - :green:`✓` - :red:`✗` - :red:`✗` * - `add_static_features()` - :green:`✓` - :green:`✓` - :green:`✓` - :green:`✓` * - `drop_static_feature()` - :green:`✓` - :green:`✓` - :green:`✓` - :green:`✓` **Method Descriptions:** `add_entity_feature()` Add computed entity-level feature to sequence data. `drop_entity_feature()` Remove entity-level feature from sequence data. `add_static_features()` Add computed static (sequence-level) feature from external data. `drop_static_feature()` Remove static feature. **Examples:** # Add entity feature sequence.add_entity_feature( "posology_mg", values = [100, 200, 150, ...] ) # Add static feature pool.add_static_features( static_data=df_static, id_column="patient_id", static_features=["age", "gender"], override=False ) # Drop features sequence.drop_entity_feature("posology_mg") pool.drop_static_feature("age") Descriptive Statistics Methods for computing descriptive statistics. .. list-table:: :header-rows: 1 :widths: 35 15 15 18 17 * - Method - Sequence - SequencePool - Trajectory - TrajectoryPool * - `describe()` - :green:`✓` - :green:`✓` - :green:`✓` - :green:`✓` * - `statistics` - :green:`✓` - :green:`✓` - :green:`✓` - :green:`✓` **Method Descriptions:** `describe()` Statistical description of sequence data in pandas-style format (DataFrame). Includes length, vocabulary size, entropy, and other metrics. `statistics` Property that computes key statistics as a dictionary. For Trajectory objects, automatically prefixes sequence-specific stats (e.g., `diagnosis_length`). **Examples:** # Statistical description (DataFrame) desc_df = sequence.describe() print(desc_df) # Add description to static data desc_df = sequence.describe(add_to_static=True) print(sequence.static_data) # desc_df merged to static # Object statistics (dict) stats = sequence.statistics print(f"Length: {stats['length']}") print(f"Vocabulary: {stats['vocab_size']}") # Pool statistics pool_stats = pool.statistics print(f"Total sequences: {pool_stats['total_sequences']}") print(f"Avg length: {pool_stats['avg_length']:.1f}") # Trajectory statistics (prefixed by sequence name) traj_stats = trajectory.statistics print(f"Diagnosis length: {traj_stats['diagnosis_length']}") print(f"Medication vocab: {traj_stats['medication_vocab_size']}") Copy & Modification Methods for copying and in-place modifications. .. list-table:: :header-rows: 1 :widths: 35 15 15 18 17 * - Method - Sequence - SequencePool - Trajectory - TrajectoryPool * - `copy(deep=True)` - :green:`✓` - :green:`✓` - :green:`✓` - :green:`✓` * - `inplace=True` - :green:`✓` - :green:`✓` - :green:`✓` - :green:`✓` **Method Descriptions:** `copy(deep=True)` Create a copy of the object. Use `deep=False` for shallow copy. `inplace=True` Most manipulation methods support `inplace=True` parameter to modify the object in place instead of returning a new copy. **Examples:** # Create copy sequence_copy = sequence.copy() # In-place modification sequence.head(10, inplace=True) # Modifies sequence directly pool.filter(criterion, inplace=True) # Modifies pool directly Notes on Trajectory-specific Behavior Methods marked with :blue:`✓\*` for Trajectory objects accept additional parameters: `sequence_name` Parameter (Optional) Many methods that operate on sequences accept an optional `sequence_name` argument to specify which sequence to operate on. # Position-based selection trajectory.head(10, sequence_name="prescriptions") trajectory.tail(5, sequence_name="events") trajectory.slice(start=0, end=50, sequence_name="events") # Filtering entity within `events` sequence trajectory.filter( criterion={"event_type": "EMERGENCY"}, sequence_name="events", criterion_type="query", level="entity" ) # Temporal alignment trajectory.zero_from_query( "medication == 'INSULIN'", sequence_name="prescriptions" ) # TrajectoryPool - same pattern pool.head(10, sequence_name="prescriptions") Apply to All Sequences **Most methods** (`head`, `tail`, `slice`, `filter`, `zero_from_position`) support `sequence_name=None` to apply the operation to **all sequences** simultaneously: # Apply to all sequences trajectory.head(10) # Apply head(10) to all sequences trajectory.tail(5) # Apply tail(5) to all sequences trajectory.zero_from_position(0) # Set T0 at first entity across all sequences # TrajectoryPool pool.slice(start=0, end=20) # Apply to all sequences in all trajectories **Exception:** `zero_from_query` always requires an explicit `sequence_name` because queries are sequence-specific: # This is REQUIRED - cannot query across all sequences trajectory.zero_from_query( query="event_type == 'ADMISSION'", sequence_name="events" # Must specify which sequence to query ) ---------------------------------------- ## Metadata Metadata Reference documentation for TanaT's metadata system. Overview Metadata describes the structure, types, and constraints of temporal data in TanaT. It is automatically inferred but can be inspected and updated as needed. Metadata Components .. topic:: Temporal Metadata Describes time representation. Two types are supported. **Datetime** (calendar time with timezone): { "temporal_type": "datetime", "granularity": "second", "settings": {"timezone": "UTC", "date_format": "%Y-%m-%d %H:%M:%S"} } **Timestep** (abstract numerical time): { "temporal_type": "timestep", "granularity": "unit", "settings": {"min_value": 0, "max_value": 100} } .. topic:: Entity Metadata Describes features within sequences. Supported types: - **categorical**: Discrete categories (ordered or unordered) - **numerical**: Continuous or discrete numbers - **duration**: Time durations with specific granularity .. topic:: Static Metadata Describes additional features not tied to temporal extent. Available in both `SequencePool` and `TrajectoryPool`. Static features use the same structure as entity metadata (categorical, numerical, or duration types). Update Methods .. topic:: update_temporal_metadata() Update temporal metadata settings. pool.update_temporal_metadata(temporal_type=None, granularity=None, **kwargs) **Attributes:** - `temporal_type` *(str)*: `"datetime"` or `"timestep"`. - `granularity` *(str)*: Time unit (e.g., `"day"`, `"hour"`). Use `"unit"` for timestep. - `settings` *(Dict)*: Type-specific settings as dictionary. - `**kwargs`: Override settings (timezone, min_value, max_value, etc.). **Type-Specific Attributes:** *Datetime* (see `DateTimeSettings`): - min_value: Minimum datetime value in the data - max_value: Maximum datetime value in the data - timezone: Timezone string (e.g., 'UTC', 'Europe/Paris') - format: Optional datetime format string for parsing *Timestep* (see `TimestepSettings`): - `min_value`: Minimum timestep value (numeric) - `max_value`: Maximum timestep value (numeric) - `dtype`: Target pandas dtype (e.g., 'int64', 'float32') **Example:** # Update timezone pool.update_temporal_metadata(timezone="Europe/Paris") # Switch to timestep pool.update_temporal_metadata(temporal_type="timestep") .. topic:: update_entity_metadata() Update metadata for an entity feature. pool.update_entity_metadata(feature_name: str, feature_type: str = None, settings: Dict|FeatureSettings = None, **kwargs) -> self **Attributes:** - `feature_name` *(str)*: Name of the feature. - `feature_type` *(str)*: Type of feature - `"categorical"`, `"numerical"`, or `"duration"`. - `settings` *(Dict|FeatureSettings)*: Feature-specific settings as a dictionary or `FeatureSettingsBase` object. - `**kwargs`: Feature-specific attributes that override `settings` if both are provided. **Feature-Specific Attributes:** *Categorical features* (see `CategoricalFeatureSettings`): - `categories`: List of valid category values - `ordered`: Whether categories have an ordering *Numerical features* (see `NumericalFeatureSettings`): - `dtype`: Target pandas dtype (e.g., `'float32'`, `'int64'`) - `min_value`: Minimum value in the data - `max_value`: Maximum value in the data *Duration features* (see `DurationFeatureSettings`): - `granularity`: Time unit - `HOUR`, `DAY`, `WEEK`, `MONTH`, or `YEAR` (default: `DAY`) - `min_value`: Minimum duration (numeric or timedelta) - `max_value`: Maximum duration (numeric or timedelta) **Example:** # Categorical feature pool.update_entity_metadata( feature_name="status", feature_type="categorical", categories=["A", "B", "C"], ordered=True ) # Duration feature pool.update_entity_metadata( feature_name="duration_hours", feature_type="duration", granularity="hour" ) .. topic:: update_static_metadata() Update metadata for a static feature. pool.update_static_metadata(feature_name: str, feature_type: str = None, settings: Dict|FeatureSettings = None, **kwargs) -> self **Available in:** Both `SequencePool` and `TrajectoryPool` Same attributes as `update_entity_metadata()`. **Example:** # In a SequencePool pool.update_static_metadata( feature_name="gender", feature_type="categorical", categories=["M", "F", "Other"] ) # In a TrajectoryPool trajectory.update_static_metadata( feature_name="birth_year", feature_type="numerical", dtype="int" ) Inspection Methods .. topic:: metadata.describe() Human-readable metadata description. pool.metadata.describe(verbose: bool = False) -> str **Attributes:** - `verbose` *(bool)*: If True, include detailed descriptions **Example:** print(pool.metadata.describe(verbose=True)) .. topic:: metadata.view() Display metadata as YAML with inline documentation. pool.metadata.view() -> None **Example:** pool.metadata.view() Metadata Propagation **In SequencePool** Updates affect all sequences in the pool: pool.update_temporal_metadata(timezone="UTC") # All sequences now use UTC **In TrajectoryPool** Temporal updates propagate to all contained sequence pools: trajectory.update_temporal_metadata(timezone="UTC") # All sequence pools in trajectory now use UTC Best Practices 1. **Let TanaT infer first**: Automatic inference handles most cases 2. **Inspect before updating**: Use `.metadata.describe(verbose=True)` 3. **Update using dedicated methods**: Use `update_*_metadata()` methods pool.update_temporal_metadata(timezone="UTC") \ .update_entity_metadata("status", feature_type="categorical") \ .update_static_metadata("gender", feature_type="categorical") ---------------------------------------- ## Metrics Metrics Reference Reference documentation for TanaT's distance metrics system for temporal sequence analysis. Overview TanaT uses a **hierarchical metric composition** approach with three levels: .. list-table:: :header-rows: 1 :widths: 20 40 40 * - Level - Description - Use Case * - **EntityMetric** - Compares individual entities (single time point observations) - Define how to compare two entities * - **SequenceMetric** - Compares entire sequences (uses EntityMetric for element-wise comparison) - Compare patient care pathways * - **TrajectoryMetric** - Compares multi-sequence trajectories (aggregates sequence distances) - Compare complete patient records Metric Composition Most sequence metrics follow this pattern: from tanat.metric.sequence import DTWSequenceMetric from tanat.metric.entity import HammingEntityMetric # EntityMetric defines how to compare individual entities entity_metric = HammingEntityMetric( settings={"entity_features": ["state", "medication"]} ) # SequenceMetric uses EntityMetric for element-wise comparison dtw = DTWSequenceMetric( settings={"entity_metric": entity_metric} ) # Compute distance between two sequences distance = dtw(sequence_a, sequence_b) **Simplified syntax** (uses defaults): # Uses "hamming" entity metric by default dtw = DTWSequenceMetric() distance = dtw(sequence_a, sequence_b) Entity Metrics Entity metrics compare **individual entities** (observations at a single time point). They are the building blocks used by sequence metrics for element-wise comparisons. .. topic:: HammingEntityMetric The default entity metric. Compares entities by checking equality. from tanat.metric.entity import HammingEntityMetric **Settings:** - `entity_features` *(List[str], default: None)*: Feature names to compare. If None, uses all available entity features. - `cost` *(Dict / Loader, default: None)*: Custom substitution costs between value pairs. If None, uses 0/1 (equal/different). - `default_value` *(float, default: 0.0)*: Cost for undefined pairs when using custom cost dict. HammingEntityMetric only supports categorical features for single-feature comparisons. For multiple features, each entity tuple becomes a composite category (so numerical features are acceptable as part of a composite). **Example:** hamming = HammingEntityMetric( settings={"entity_features": ["state", "medication"]} ) distance = hamming(entity_a, entity_b) Sequence Metrics Sequence metrics compare **entire sequences** by considering the order and timing of entities. Most sequence metrics use an EntityMetric internally for element-wise comparisons. .. topic:: DTWSequenceMetric Dynamic Time Warping. Finds optimal alignment allowing time stretching/compression. from tanat.metric.sequence import DTWSequenceMetric **Settings:** - `entity_metric` *(str / EntityMetric, default: "hamming")*: Metric for comparing entities. - `window` *(int, default: None)*: Sakoe-Chiba band width. None = no constraint. - `max_time_diff` *(timedelta / int, default: None)*: Maximum time difference between compared events. - `normalize` *(bool, default: False)*: If True, normalize by warping path length. - `distance_matrix` *(MatrixStorageOptions)*: Options for disk storage and resume support. **Example:** dtw = DTWSequenceMetric(settings={"window": 10, "normalize": True}) distance = dtw(seq_a, seq_b) .. topic:: SoftDTWSequenceMetric Differentiable version of DTW. from tanat.metric.sequence import SoftDTWSequenceMetric **Settings:** - `entity_metric` *(str / EntityMetric, default: "hamming")*: Metric for comparing entities. - `gamma` *(float, default: 1.0)*: Smoothing parameter. Lower values → closer to standard DTW. - `distance_matrix` *(MatrixStorageOptions)*: Options for disk storage and resume support. **Example:** soft_dtw = SoftDTWSequenceMetric(settings={"gamma": 0.1}) distance = soft_dtw(seq_a, seq_b) .. topic:: EditSequenceMetric Edit distance. Counts minimum insertions, deletions, substitutions. from tanat.metric.sequence import EditSequenceMetric **Settings:** - `entity_metric` *(str / EntityMetric, default: "hamming")*: Metric for substitution cost. - `indel_cost` *(float, default: 1.0)*: Cost for insertion/deletion operations. - `normalize` *(bool, default: False)*: If True, normalize by maximum sequence length. - `distance_matrix` *(MatrixStorageOptions)*: Options for disk storage and resume support. **Example:** edit = EditSequenceMetric(settings={"indel_cost": 1.0, "normalize": True}) distance = edit(seq_a, seq_b) .. topic:: LCSSequenceMetric Longest Common Subsequence. Similarity based on common elements (order matters). from tanat.metric.sequence import LCSSequenceMetric **Settings:** - `entity_metric` *(str / EntityMetric, default: "hamming")*: Metric for comparing entities. - `equality_threshold` *(float, default: 0.0)*: Maximum distance to consider entities as equal. - `as_distance` *(bool, default: False)*: If True, returns distance instead of LCS length. - `normalize` *(bool, default: False)*: If True, uses normalized distance formula. - `distance_matrix` *(MatrixStorageOptions)*: Options for disk storage and resume support. **Example:** lcs = LCSSequenceMetric(settings={"as_distance": True, "normalize": True}) similarity = lcs(seq_a, seq_b) .. topic:: LCPSequenceMetric Longest Common Prefix. Similarity based on matching prefix from the start. from tanat.metric.sequence import LCPSequenceMetric **Settings:** - `entity_metric` *(str / EntityMetric, default: "hamming")*: Metric for comparing entities. - `equality_threshold` *(float, default: 0.0)*: Maximum distance to consider entities as equal. - `as_distance` *(bool, default: False)*: If True, returns distance instead of LCP length. - `normalize` *(bool, default: False)*: If True, uses normalized distance formula. - `distance_matrix` *(MatrixStorageOptions)*: Options for disk storage and resume support. **Example:** lcp = LCPSequenceMetric(settings={"as_distance": True}) similarity = lcp(seq_a, seq_b) .. topic:: LinearPairwiseSequenceMetric Simple pairwise comparison. Compares sequences element by element. from tanat.metric.sequence import LinearPairwiseSequenceMetric **Settings:** - `entity_metric` *(str / EntityMetric, default: "hamming")*: Metric for comparing entities. - `agg_fun` *(str, default: "mean")*: Aggregation function: "mean", "sum". - `padding_penalty` *(float, default: 0.0)*: Penalty for length difference. - `distance_matrix` *(MatrixStorageOptions)*: Options for disk storage and resume support. **Example:** linear = LinearPairwiseSequenceMetric(settings={"agg_fun": "sum"}) distance = linear(seq_a, seq_b) .. topic:: Chi2SequenceMetric Chi-squared distance. Compares time spent in each state (ignores temporal order). from tanat.metric.sequence import Chi2SequenceMetric **Settings:** - `entity_features` *(List[str], default: None)*: Feature(s) defining categories. - `distance_matrix` *(MatrixStorageOptions)*: Options for disk storage and resume support. Chi2 does **not** use an `entity_metric`. It computes state distributions directly. **Duration:** EventSequence counts events; StateSequence uses actual durations. **Example:** chi2 = Chi2SequenceMetric(settings={"entity_features": ["state"]}) distance = chi2(seq_a, seq_b) Computing Distance Matrices All metrics support computing pairwise distance matrices for pools: from tanat.metric.sequence import DTWSequenceMetric dtw = DTWSequenceMetric() dm = dtw.compute_matrix(sequence_pool) print(dm.shape) # (n_sequences, n_sequences) print(dm.to_dataframe()) **With disk caching** (for large pools): from tanat.metric.matrix import MatrixStorageOptions dtw = DTWSequenceMetric(settings={ "distance_matrix": MatrixStorageOptions( store_path="./cache/dtw_matrix", resume=True ) }) dm = dtw.compute_matrix(large_pool) Trajectory Metrics Trajectory metrics compare **multi-sequence trajectories** by aggregating distances across multiple sequence types. .. topic:: AggregationTrajectoryMetric Aggregates sequence-level distances across multiple sequence types. from tanat.metric.trajectory import AggregationTrajectoryMetric **Settings:** - `default_metric` *(str / SequenceMetric, default: "linearpairwise")*: Default metric for unlisted sequences. - `sequence_metrics` *(Dict[str, SequenceMetric], default: None)*: Metric per sequence type. - `agg_fun` *(str, default: "mean")*: How to combine sequence distances. - `weights` *(Dict[str, float], default: None)*: Weights per sequence type. - `distance_matrix` *(MatrixStorageOptions)*: Options for disk storage and resume support. **Example:** from tanat.metric.trajectory import AggregationTrajectoryMetric from tanat.metric.sequence import DTWSequenceMetric traj_metric = AggregationTrajectoryMetric(settings={ "sequence_metrics": {"diagnoses": DTWSequenceMetric()}, "weights": {"diagnoses": 2.0} }) distance = traj_metric(traj_a, traj_b) ---------------------------------------- ## Sequence container """ Sequence container ================== A sequence container allows storing and accessing sequence data. """ ### Required imports from datetime import datetime # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, generate_state_sequences, generate_interval_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, StateSequencePool, IntervalSequencePool, ) ## 1. Event sequences Let's create a simple sequence of events data. N_SEQ = 10 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple sequences of events simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) # Store data in a sequence pool simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool # Access single sequence simple_pool["seq-0"] # Access first entity (0-based) simple_pool["seq-0"][0] ## 2. States sequences Let's create a simple sequence of states data. # Sequence of states data_states = generate_state_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["Z", "Y", "X", "W"], missing_data=0.0, entity_feature="states", seed=SEED, ) simple_settings = { "id_column": "id", "start_column": "start_date", "default_end_value": datetime.now(), # Avoid warning "entity_features": ["states"], } simple_pool = StateSequencePool(data_states, simple_settings) simple_pool # Access single sequence simple_pool["seq-0"] # Access first entity (0-based) simple_pool["seq-0"][0] ## 3. Intervals sequence Let's create a simple sequence of intervals data. # Sequence of intervals data_intervals = generate_interval_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["Z", "Y", "X", "W"], missing_data=0.0, entity_feature="states", seed=SEED, ) simple_settings = { "id_column": "id", "start_column": "start_date", "end_column": "end_date", "entity_features": ["states"], } simple_pool = IntervalSequencePool(data_intervals, simple_settings) simple_pool # Access single sequence simple_pool["seq-0"] # Access first entity (0-based) simple_pool["seq-0"][0] ## 4. Transformations Transform sequences to different data representations. The following transformations are available for both sequence pools and individual sequence objects. # Convert to occurrence data occurrence_data = simple_pool.to_occurrence(by_id=True, drop_na=True) occurrence_data # Convert to occurrence frequency frequency_data = simple_pool.to_occurrence_frequency( by_id=False, drop_na=True, ) frequency_data # Convert to relative time relative_time_data = simple_pool.to_relative_time( drop_na=True, granularity="day", ) relative_time_data # Calculate time spent time_spent_data = simple_pool.to_time_spent( by_id=True, granularity="day", drop_na=True, ) time_spent_data # Calculate relative rank relative_rank_data = simple_pool.to_relative_rank(drop_na=True) relative_rank_data # Modify sequence starting point (t_zero) simple_pool.zero_from_position(2) updated_rank_data = simple_pool.to_relative_rank(drop_na=True) updated_rank_data ---------------------------------------- ## Trajectory Container """ Trajectory Container ==================== Work with multi-sequence trajectories for complex data. """ A trajectory container stores and provides access to trajectory data. Trajectories combine multiple sequence types, such as events, states, and intervals. ### Required Imports from datetime import datetime # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, generate_state_sequences, generate_interval_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, StateSequencePool, IntervalSequencePool, ) # Trajectory pool from tanat.trajectory import TrajectoryPool ## 1. Data Setup N_SEQ = 10 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple event sequences event_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) # Event sequence pool settings event_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } event_pool = EventSequencePool(event_data, event_settings) event_pool # Generate interval sequences interval_data = generate_interval_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["Z", "Y", "X", "W"], missing_data=0.0, entity_feature="states", seed=SEED, ) interval_settings = { "id_column": "id", "start_column": "start_date", "end_column": "end_date", "entity_features": ["states"], } interval_pool = IntervalSequencePool(interval_data, interval_settings) interval_pool # Generate state sequences state_data = generate_state_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["Z", "Y", "X", "W"], missing_data=0.0, entity_feature="states", seed=SEED, ) state_settings = { "id_column": "id", "start_column": "start_date", "default_end_value": datetime.now(), # Avoid warning "entity_features": ["states"], } state_pool = StateSequencePool(state_data, state_settings) state_pool ## 2. Build Trajectory Pool # Build trajectory pool trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(event_pool, "events") trajectory_pool.add_sequence_pool(state_pool, "states") trajectory_pool.add_sequence_pool(interval_pool, "intervals") # Update trajectory settings trajectory_pool.update_settings(intersection=False) # View trajectory pool trajectory_pool # Access trajectory by id target_id = "seq-0" trajectory_pool[target_id] # return all sub-sequences in the trajectory # Access sub-sequence trajectory_pool[target_id]["events"] trajectory_pool[target_id]["states"] trajectory_pool[target_id]["intervals"] # Access sequence pool within trajectory trajectory_pool.sequence_pools["events"] ---------------------------------------- ## Query criterion """ Query criterion ================== Filtering sequences or entities based on pandas-like query logic. """ ### Required imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) # Trajectory pools from tanat.trajectory import ( TrajectoryPool, ) # Criterion from tanat.criterion import ( QueryCriterion, ) ## 1. Data Setup Generate a simple sequence dataset to demonstrate filtering with query criterion. N_SEQ = 10 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate event sequences with predefined vocabulary simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool ## 2. Entity Level Filtering Filter entities inside sequences based on a query criterion. criterion = QueryCriterion(query="event == 'A'") filtered_seqpool = simple_pool.filter(criterion, level="entity") filtered_seqpool # Filter entities in a single sequence single_seq = simple_pool["seq-0"] criterion = QueryCriterion(query="event == 'A'") filtered_seq = single_seq.filter(criterion) filtered_seq ## 3. Sequence Level Filtering Filter entire sequences that match the query criterion. criterion = QueryCriterion(query="event == 'A'") filtered_seqpool = simple_pool.filter(criterion, level="sequence") filtered_seqpool # Check if a single sequence matches the criterion single_seq = simple_pool["seq-0"] criterion = QueryCriterion(query="event == 'A'") matches = single_seq.match(criterion) matches # Get IDs of sequences matching the criterion criterion = QueryCriterion(query="event == 'A'") matching_ids = simple_pool.which(criterion) matching_ids ## 4. Applying query criterion in a trajectory Pool Query criterion cannot be applied directly at the trajectory level, but can be applied to sequences or entities inside trajectories through filtering. # Create an empty trajectory pool and add the event sequence pool trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(simple_pool, "events") # Filter sequences in the trajectory pool criterion = QueryCriterion(query="event == 'A'") filtered_trajpool = trajectory_pool.filter( criterion, level="sequence", # Specify which sequence to filter sequence_name="events", # Propagate filtered sequences to trajectory level (for multi-sequence trajectories) intersection=True, ) filtered_trajpool # Filter entities in the trajectory pool sequences criterion = QueryCriterion(query="event == 'A'") filtered_trajpool = trajectory_pool.filter( criterion, level="entity", sequence_name="events", # Specify which sequence to filter ) filtered_trajpool # Filter entities in a single trajectory criterion = QueryCriterion(query="event == 'A'") single_trajectory = trajectory_pool["seq-0"] filtered_traj = single_trajectory.filter( criterion, sequence_name="events", # Specify which sequence to filter ) filtered_traj ---------------------------------------- ## Static criterion """ Static criterion ================== Filtering sequences based on pandas-like queries applied to static data. """ ### Required imports import pandas as pd import numpy as np # Data simulation from tanat.dataset.simulation.sequence import generate_event_sequences # Sequence pools from tanat.sequence import EventSequencePool # Trajectory pools from tanat.trajectory import TrajectoryPool # Static filtering criterion from tanat.criterion import StaticCriterion ## 1. Data Setup Generate a simple sequence dataset and associated static data for filtering. N_SEQ = 10 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate sequences with fixed vocabulary simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) # Generate static data (e.g., demographic info) matching the sequence IDs ids = list(simple_pool.unique_ids) static_df = pd.DataFrame( { "id": ids, # age between 20 and 60 "age": np.random.randint(20, 60, size=len(ids)), # score between 0 and 100 "score": np.round(np.random.uniform(0, 100, len(ids)), 2), } ) # Attach static data to the sequence pool simple_pool.add_static_features(static_df) ## 2. Sequence level filtering Filter sequences based on static attributes. criterion = StaticCriterion(query="age > 40") filtered_seqpool = simple_pool.filter(criterion) filtered_seqpool # Check if a single sequence matches the static criterion single_seq = simple_pool["seq-0"] criterion = StaticCriterion(query="age > 40") matches = single_seq.match(criterion) matches # Get IDs of sequences matching the static criterion criterion = StaticCriterion(query="age > 40") matching_ids = simple_pool.which(criterion) matching_ids ## 3. Trajectory level filtering Filter trajectories based on static attributes linked to their sequences. trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(simple_pool, "events") # Attach static data to the trajectory pool trajectory_pool.add_static_features(static_df, id_column="id") # Filter trajectories criterion = StaticCriterion(query="age > 40") filtered_trajpool = trajectory_pool.filter(criterion, level="trajectory") filtered_trajpool # Check if a single trajectory matches the static criterion single_traj = trajectory_pool["seq-0"] criterion = StaticCriterion(query="age > 40") matches = single_traj.match(criterion) matches # Get IDs of trajectories matching the static criterion criterion = StaticCriterion(query="age > 40") matching_ids = trajectory_pool.which(criterion) matching_ids ---------------------------------------- ## Time criterion """ Time criterion ================== Filtering sequences or entities based on temporal constraints. """ ### Required imports from datetime import datetime, timedelta # Data simulation from tanat.dataset.simulation.sequence import generate_event_sequences # Sequence pools from tanat.sequence import EventSequencePool # Trajectory pools from tanat.trajectory import TrajectoryPool # Time-based filtering criterion from tanat.criterion import TimeCriterion ## 1. Data Setup Generate a simple sequence dataset with timestamps. N_SEQ = 10 SIZE_DISTRIBUTION = [12, 15, 20, 25] # Varying sequence lengths SEED = 42 simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool.sequence_data ## 2. Entity level filtering Filter entities (i.e., events) occurring within a specific time window. Between 3 months ago and now start_date = datetime.now() - timedelta(days=90) end_date = datetime.now() time_window_criterion = TimeCriterion( start_after=start_date, end_before=end_date, ) filtered_seqpool = simple_pool.filter(time_window_criterion, level="entity") filtered_seqpool # Filter a single sequence at entity level single_seq = simple_pool["seq-1"] filtered_seq = single_seq.filter(time_window_criterion) filtered_seq ## 3. Sequence level filtering Keep only sequences where **all events** fall within the time window. time_window_criterion = TimeCriterion( start_after=start_date, end_before=end_date, # True: Sequence must be entirely contained within the time range sequence_within=True, ) filtered_seqpool = simple_pool.filter(time_window_criterion, level="sequence") filtered_seqpool # Check if a single sequence fully matches the time window single_seq = simple_pool["seq-0"] matches = single_seq.match(time_window_criterion) matches # Get IDs of sequences matching the time criterion matching_ids = simple_pool.which(time_window_criterion) matching_ids ## 4. Applying time criterion in a trajectory Pool Time criterion cannot be applied directly at the trajectory level, but can be applied to sequences or entities inside trajectories through filtering. trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(simple_pool, "events") # Filter entities inside trajectories filtered_trajpool = trajectory_pool.filter( time_window_criterion, level="entity", sequence_name="events", # Specify which sequence to filter ) filtered_trajpool # Filter entire sequences inside trajectories filtered_trajpool = trajectory_pool.filter( time_window_criterion, level="sequence", sequence_name="events", intersection=True, # Propagate sequence filtering to trajectory level ) filtered_trajpool ---------------------------------------- ## Pattern criterion """ Pattern criterion ================== Filtering sequences or entities based on event patterns. """ ### Required imports # Data simulation from tanat.dataset.simulation.sequence import generate_event_sequences # Sequence pools from tanat.sequence import EventSequencePool # Trajectory pools from tanat.trajectory import TrajectoryPool # Pattern-based filtering criterion from tanat.criterion import PatternCriterion ## 1. Data Setup Generate a simple sequence dataset for pattern-based filtering. N_SEQ = 10 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) ## 2. Entity level filtering Filter entities that are part of a specific event pattern. # Example: look for sequences containing the pattern A → B → C pattern_criterion = PatternCriterion( pattern={"event": ["A", "B", "C"]}, contains=True, # Match anywhere in the sequence (not necessarily the full sequence) ) filtered_seqpool = simple_pool.filter(pattern_criterion, level="entity") filtered_seqpool # Filter a single sequence at entity level single_seq = simple_pool["seq-5"] filtered_seq = single_seq.filter(pattern_criterion) filtered_seq ## 3. Sequence level filtering Keep only sequences that match the full pattern. filtered_seqpool = simple_pool.filter(pattern_criterion, level="sequence") filtered_seqpool # Check if a single sequence matches the pattern single_seq = simple_pool["seq-5"] matches = single_seq.match(pattern_criterion) matches # Get IDs of sequences matching the pattern matching_ids = simple_pool.which(pattern_criterion) matching_ids ## 4. Applying pattern criterion in a trajectory pools Pattern criterion cannot be applied directly at the trajectory level, but can be applied to sequences or entities inside trajectories through filtering. trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(simple_pool, "events") # Filter entities in trajectories based on pattern filtered_trajpool = trajectory_pool.filter( pattern_criterion, level="entity", sequence_name="events", # Specify which sequence to filter ) filtered_trajpool # Filter full sequences in trajectories based on pattern filtered_trajpool = trajectory_pool.filter( pattern_criterion, level="sequence", sequence_name="events", intersection=True, # Propagate filtered sequences to the trajectory level ) filtered_trajpool ---------------------------------------- ## Length criterion """ Length criterion ================== Filtering sequences based on their length. """ ### Required imports # Data simulation from tanat.dataset.simulation.sequence import generate_event_sequences # Sequence pools from tanat.sequence import EventSequencePool # Trajectory pools from tanat.trajectory import TrajectoryPool # Length-based filtering criterion from tanat.criterion import LengthCriterion ## 1. Data Setup Generate a simple sequence dataset with varying lengths. N_SEQ = 10 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) ## 2. Sequence level filtering Keep only sequences whose length satisfies the criterion. # Example: keep sequences with length strictly greater than 5 length_criterion = LengthCriterion(gt=5) filtered_seqpool = simple_pool.filter(length_criterion) filtered_seqpool # Check if a single sequence matches the length criterion single_seq = simple_pool["seq-0"] matches = single_seq.match(length_criterion) matches # Get IDs of sequences matching the length criterion matching_ids = simple_pool.which(length_criterion) matching_ids ## 3. Applying length criterion in a trajectory pool Length criterion are applied at the sequence level inside trajectory pools. trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(simple_pool, "events") # Filter sequences in trajectories based on length filtered_trajpool = trajectory_pool.filter( length_criterion, level="sequence", sequence_name="events", # Specify which sequence to filter ) filtered_trajpool ---------------------------------------- ## Rank criterion """ Rank criterion ================== Filter entities based on their rank or position. """ This example demonstrates filtering entities based on their position/rank in sequences using both direct criterion API and convenient helper methods (head, tail, slice). ### Required imports # Data simulation from tanat.dataset.simulation.sequence import generate_event_sequences # Sequence pools from tanat.sequence import EventSequencePool # Trajectory pools from tanat.trajectory import TrajectoryPool # Rank-based filtering criterion from tanat.criterion import RankCriterion ## 1. Data Setup Generate a simple sequence dataset to demonstrate rank-based filtering. N_SEQ = 10 SIZE_DISTRIBUTION = [8, 9, 10, 11, 12] SEED = 42 simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool ## 2. Helper Methods (Recommended) The easiest way to use rank-based filtering is through the helper methods: ``head()``, ``tail()``, and ``slice()``. These methods provide a clean, intuitive API for position-based selection. ### Using head() - Get first N entities # Get first 5 entities from each sequence first_5 = simple_pool.head(5) first_5 # Negative values: get all EXCEPT last N entities all_but_last_2 = simple_pool.head(-2) all_but_last_2 ### Using tail() - Get last N entities # Get last 3 entities from each sequence last_3 = simple_pool.tail(3) last_3 # Negative values: get all EXCEPT first N entities all_but_first_2 = simple_pool.tail(-2) all_but_first_2 ### Using slice() - Position range with optional step # Select entities from position 2 to 7 middle_entities = simple_pool.slice(start=2, end=7) middle_entities # Sample every 2nd entity every_second = simple_pool.slice(step=2) every_second # Combine: positions 1 to 8, every 2nd entity sampled_range = simple_pool.slice(start=1, end=8, step=2) sampled_range # Using negative indices (Python-style) last_five = simple_pool.slice(start=-5, end=None) last_five ## 3. Direct RankCriterion API (Advanced) For more complex scenarios or when you need programmatic control, you can use the RankCriterion directly. ### Using first/last parameters # Get first 5 entities using criterion rank_criterion = RankCriterion(first=5) filtered_pool = simple_pool.filter(rank_criterion, level="entity") filtered_pool # Get last 3 entities using criterion rank_criterion = RankCriterion(last=3) filtered_pool = simple_pool.filter(rank_criterion, level="entity") filtered_pool ### Using start/end parameters # Select entities from position 2 to 7 rank_criterion = RankCriterion(start=2, end=7) filtered_pool = simple_pool.filter(rank_criterion, level="entity") filtered_pool # Add step for sampling rank_criterion = RankCriterion(start=0, end=10, step=2) filtered_pool = simple_pool.filter(rank_criterion, level="entity") filtered_pool ### Using specific ranks # Select specific positions (0-based indexing) rank_criterion = RankCriterion(ranks=[0, 2, 4, 6]) filtered_pool = simple_pool.filter(rank_criterion, level="entity") filtered_pool # Negative ranks (from end: -1 = last, -2 = second to last) rank_criterion = RankCriterion(ranks=[-1, -2, -3]) filtered_pool = simple_pool.filter(rank_criterion, level="entity") filtered_pool ## 4. Single Sequence Operations Helper methods work seamlessly with individual sequences. single_seq = simple_pool["seq-0"] print(f"Original sequence length: {len(single_seq)}") # Head method on single sequence first_3 = single_seq.head(3) print(f"After head(3): {len(first_3)} entities") first_3 # Tail method on single sequence last_4 = single_seq.tail(4) print(f"After tail(4): {len(last_4)} entities") last_4 # Slice method on single sequence middle = single_seq.slice(start=2, end=6) print(f"After slice(start=2, end=6): {len(middle)} entities") middle # Python-style indexing also works! first_entity = single_seq[0] last_entity = single_seq[-1] sliced = single_seq[1:5:2] # start:end:step print(f"Python indexing [1:5:2]: {len(sliced)} entities") ## 5. Trajectory Operations Helper methods support trajectory-specific operations with the ``sequence_name`` parameter. trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(simple_pool, "events") trajectory_pool ### Targeting specific sequences # Get first 5 entities from 'events' sequence filtered_traj = trajectory_pool.head(5, sequence_name="events") filtered_traj # Slice with step on specific sequence sampled_traj = trajectory_pool.slice(start=0, end=8, step=2, sequence_name="events") sampled_traj ### Applying to all sequences # When sequence_name is None (default), applies to ALL sequences all_sequences_head = trajectory_pool.head(4) all_sequences_head ### Using direct criterion API for trajectories # For programmatic control, use filter with RankCriterion rank_criterion = RankCriterion(first=6) filtered_traj = trajectory_pool.filter( rank_criterion, level="entity", sequence_name="events" ) filtered_traj ## 6. Relative Mode (T0-based positioning) RankCriterion supports relative mode for T0-aligned sequences. This is useful when working with temporal reference points. # Set T0 for sequences (using third entity as reference) pool_with_t0 = simple_pool.copy() pool_with_t0.zero_from_position(3) # Use relative ranks (relative to T0 entity) rank_criterion = RankCriterion(start=-2, end=3, relative=True) relative_filtered = pool_with_t0.filter(rank_criterion, level="entity") relative_filtered # Slice method also supports relative mode (positions relative to T0) relative_filtered = pool_with_t0.slice(start=-2, end=3, relative=True) relative_filtered ---------------------------------------- ## Hamming metric """ Hamming metric ================== Compute the Hamming distance between two entities. """ ### Required imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) # Entity metrics from tanat.metric.entity import ( HammingEntityMetric, HammingEntityMetricSettings, ) ## Data Setup Let's create a simple sequence data to demonstrate different metric capabilities. N_SEQ = 10 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple sequences for clear metric demonstration simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool ## Hamming Entity Metric Entity metrics compare individual elements within sequences. They form the foundation for sequence-level comparisons. By default, the Hamming metric returns 0 for identical elements and 1 for different elements. # Create Hamming entity metric with default settings settings = HammingEntityMetricSettings() hamming_metric = HammingEntityMetric(settings=settings) # -- Settings overview hamming_metric # Access first entity from seq-0 entity_a = simple_pool["seq-0"][0] # Access first entity from seq-1 entity_b = simple_pool["seq-1"][0] # Compute distance between two entities print(f"Distance between: {entity_a.value} and {entity_b.value}") hamming_metric(entity_a, entity_b) Before computing the metric, you can customize its behavior using `update_settings()` or `kwargs`. In the case of the Hamming metric, providing a cost_dict allows you to control the cost associated with specific comparisons between entity values. # Preconfigure the cost dictionary using update_settings hamming_metric.update_settings( cost={ ("A", "B"): 0, ("B", "C"): 1, ("C", "D"): 2, ("A", "C"): 3, ("A", "D"): 4, ("B", "D"): 5, }, default_value=-2, # Fallback cost for any unspecified comparison ) hamming_metric(entity_a, entity_b) # Provide the cost dictionary directly as kwargs hamming_metric( entity_a, entity_b, cost={ ("A", "B"): 0, ("B", "C"): 1, ("C", "D"): 2, ("A", "C"): 3, ("A", "D"): 4, ("B", "D"): 5, }, default_value=10, # Fallback cost for any unspecified comparison ) ---------------------------------------- ## Create Custom Metric """ Create Custom Metric ============================ Learn how to create a custom entity metric and use it with sequence metrics. """ ### Required imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) ## Data Setup Let's create a simple sequence data to demonstrate custom entity metrics. N_SEQ = 10 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple sequences for clear metric demonstration simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D", "E", "F", "G"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool ## Custom Entity Metric Entity metrics compare individual elements within sequences. Let's create a custom metric that computes an "alphabetical distance" between two categorical values (e.g., distance between "A" and "C" is 2). import numpy as np from numba import njit from numba.typed import List as NumbaList from pydantic.dataclasses import dataclass from tanat.metric.entity.base.metric import EntityMetric from tanat.metric.entity.base.settings import BaseEntityMetricSettings # Define the Numba kernel @njit def _alphabetical_kernel(val_a, val_b, context): """Compute alphabetical distance between encoded values.""" dist = abs(val_a - val_b) normalize = context[0] # 1 = normalize, 0 = raw if normalize: return dist / 25.0 return float(dist) @dataclass class AlphabeticalDistanceSettings(BaseEntityMetricSettings): """Settings for the alphabetical distance metric. Note: Inherits entity_features from BaseEntityMetricSettings. """ normalize: bool = False # If True, normalize by max possible distance class AlphabeticalEntityMetric(EntityMetric, register_name="alphabetical"): """ Metric that computes the alphabetical distance between two entity values. For example: distance("A", "C") = 2 (two letters apart) """ SETTINGS_DATACLASS = AlphabeticalDistanceSettings def __init__(self, settings=None): if settings is None: settings = AlphabeticalDistanceSettings() super().__init__(settings) def _compute_single_distance(self, ent_a, ent_b): """ Compute alphabetical distance between two entities. Note: Entity types and feature types are already validated by the base class before this method is called (via __call__). Args: ent_a (Entity): First entity. ent_b (Entity): Second entity. Returns: float: The alphabetical distance. """ # Get value using entity_features from metric settings val_a = ent_a.get_value(self.entity_features) val_b = ent_b.get_value(self.entity_features) # Get first character of each value (assuming string values) char_a = str(val_a)[0].upper() char_b = str(val_b)[0].upper() # Compute distance as difference in ASCII values dist = abs(ord(char_a) - ord(char_b)) if self._settings.normalize: # Normalize by max distance (25 for A-Z) return dist / 25.0 return float(dist) def prepare_computation_data(self, sequence_array): """ Prepare data for Numba computation. Needed for SequenceMetric compatibility. Allows efficient use of this entity metric within sequence-level metrics. Returns a NumbaList of encoded arrays for efficient Numba processing. and a context tuple with normalization flag. """ encoded_arrays = NumbaList() for arr in sequence_array.data: # Encode first character of each value as integer (A=0, B=1, ...) encoded = np.array( [ord(str(val)[0].upper()) - ord("A") for val in arr], dtype=np.int32 ) encoded_arrays.append(encoded) # Context: tuple with normalization flag (1 = normalize, 0 = raw) context = (int(self._settings.normalize),) return encoded_arrays, context @property def distance_kernel(self): """ Return the Numba-compiled distance function. Used for efficient computation at sequence metric level. """ return _alphabetical_kernel def validate_feature_types(self, feature_types): """ Entity metric constraint over feature types. Here we validate that features are categorical or textual (for alphabetical comparison). Use by __call__ or at sequence metric level before computation. """ for ftype in feature_types: if ftype not in ("categorical", "textual"): raise ValueError( f"AlphabeticalDistanceMetric requires categorical or textual features, " f"got '{ftype}'" ) ### Test the Custom Entity Metric Let's test our custom metric on individual entities. # Access entities from sequences entity_a = simple_pool["seq-0"][0] entity_b = simple_pool["seq-1"][0] print(f"Entity A: {entity_a.value}") print(f"Entity B: {entity_b.value}") # Create and test custom metric custom_metric = AlphabeticalEntityMetric() custom_metric # Compute distance between two entities distance = custom_metric(entity_a, entity_b) print( f"Alphabetical distance between '{entity_a.value}' and '{entity_b.value}': {distance}" ) # Test with normalization distance_normalized = custom_metric(entity_a, entity_b, normalize=True) print(f"Normalized distance: {distance_normalized}") ## Use Custom Entity Metric with Sequence Metrics The real power of custom entity metrics is using them as building blocks for sequence-level comparisons. Let's use our metric with `LinearPairwiseSequenceMetric`. from tanat.metric.sequence import ( LinearPairwiseSequenceMetric, LinearPairwiseSequenceMetricSettings, ) # Create LinearPairwise metric using our custom entity metric linear_settings = LinearPairwiseSequenceMetricSettings( entity_metric=custom_metric, agg_fun="sum", ) linear_metric = LinearPairwiseSequenceMetric(settings=linear_settings) linear_metric # Compute distance between two sequences using our custom entity metric seq_0 = simple_pool["seq-0"] seq_1 = simple_pool["seq-1"] print(f"Sequence 0: {[e.value for e in seq_0]}") print(f"Sequence 1: {[e.value for e in seq_1]}") distance = linear_metric(seq_0, seq_1) print(f"LinearPairwise distance with AlphabeticalEntityMetric: {distance:.4f}") # Compute full distance matrix using our custom entity metric dm = linear_metric.compute_matrix(simple_pool) print("Distance Matrix with AlphabeticalEntityMetric:") dm.to_dataframe() ---------------------------------------- ## Dynamic Time Warping """ Dynamic Time Warping ================== Compute the dynamic time warping (DTW) distance between two sequences. """ ### Required imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) # Sequence Metrics from tanat.metric.sequence import ( DTWSequenceMetric, DTWSequenceMetricSettings, ) ## Data Setup Let's create a simple sequence data to demonstrate the DTW metric. N_SEQ = 1000 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple sequences for clear metric demonstration simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool ### Dynamic Time Warping (DTW) DTW allows flexible alignment between sequences of different lengths. # Create DTW metric settings = DTWSequenceMetricSettings() dtw_metric = DTWSequenceMetric(settings=settings) # -- Settings overview dtw_metric # Access two simple sequences seq_0 = simple_pool["seq-0"] seq_1 = simple_pool["seq-1"] # Compute DTW distance dtw_metric(seq_0, seq_1) # Compute DTW directly on sequence pool dm = dtw_metric.compute_matrix(simple_pool) dm.to_dataframe().head() Before computing the metric, you can customize its behavior using `update_settings()` or `kwargs`. # Preconfigure the the behavior of the metric dtw_metric.update_settings( window=2, # band constraint = 2 ) dm = dtw_metric.compute_matrix(simple_pool) dm.to_dataframe().head() # Modify the behavior directly from kwargs dm = dtw_metric.compute_matrix( simple_pool, window=2, # band constraint = 2 ) dm.to_dataframe().head() ---------------------------------------- ## Edit Distance """ Edit Distance ================== Compute the edit distance between two sequences. """ ### Required imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) # Sequence Metrics from tanat.metric.sequence import ( EditSequenceMetric, EditSequenceMetricSettings, ) ## Data Setup Let's create a simple sequence data to demonstrate the Edit metric. N_SEQ = 1000 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple sequences for clear metric demonstration simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool ### Edit Distance Measures the minimum number of operations needed to transform one sequence into another. # Create edit distance metric settings = EditSequenceMetricSettings() edit_metric = EditSequenceMetric(settings=settings) # Settings overview edit_metric # Access two simple sequences seq_0 = simple_pool["seq-0"] seq_1 = simple_pool["seq-1"] # Compute edit distance edit_metric(seq_0, seq_1) # Compute Edit distance directly on sequence pool dm = edit_metric.compute_matrix(simple_pool) dm.to_dataframe().head() Before computing the metric, you can customize its behavior using `update_settings()` or `kwargs`. # Preconfigure the indel cost using update_settings edit_metric.update_settings( indel_cost=2.0, # Double the indel cost ) dm = edit_metric.compute_matrix(simple_pool) dm.to_dataframe().head() # Modify behavior directly from kwargs dm = edit_metric.compute_matrix( simple_pool, indel_cost=2.0, # Double the indel cost ) dm.to_dataframe().head() ---------------------------------------- ## Longest Common Prefix """ Longest Common Prefix ================== Compute the longest common prefix (LCP) between two sequences. """ ### Required imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) # Sequence Metrics from tanat.metric.sequence import ( LCPSequenceMetric, LCPSequenceMetricSettings, ) ## Data Setup Let's create a simple sequence data to demonstrate the LCP metric. N_SEQ = 1000 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple sequences for clear metric demonstration simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool ### Longest Common Prefix (LCP) Focuses on similarity at the beginning of sequences. # Create LCP metric settings = LCPSequenceMetricSettings() lcp_metric = LCPSequenceMetric(settings=settings) # -- Settings overview lcp_metric # Access two simple sequences seq_0 = simple_pool["seq-0"] seq_1 = simple_pool["seq-1"] # Compute LCP distance lcp_metric(seq_0, seq_1) # Compute LCP distance directly on sequence pool dm = lcp_metric.compute_matrix(simple_pool) dm.to_dataframe().head() Before computing the metric, you can customize its behavior using `update_settings()` or `kwargs`. # Preconfigure the behavior using update_settings lcp_metric.update_settings( as_distance=True, normalize=False, ) dm = lcp_metric.compute_matrix(simple_pool) dm.to_dataframe().head() # Modify behavior directly from kwargs dm = lcp_metric.compute_matrix( simple_pool, as_distance=True, normalize=False, ) dm.to_dataframe().head() ---------------------------------------- ## Longest Common Subsequence """ Longest Common Subsequence ================== Compute the longest common subsequence (LCS) between two sequences. """ ### Required imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) # Sequence Metrics from tanat.metric.sequence import ( LCSSequenceMetric, LCSSequenceMetricSettings, ) ## Data Setup Let's create a simple sequence data to demonstrate the LCS metric. N_SEQ = 1000 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple sequences for clear metric demonstration simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool ### Longest Common Subsequence (LCS) Measures similarity based on the longest common subsequence between sequences. # Create LCS metric settings = LCSSequenceMetricSettings() lcs_metric = LCSSequenceMetric(settings=settings) # -- Settings overview lcs_metric # Access two simple sequences seq_0 = simple_pool["seq-0"] seq_1 = simple_pool["seq-1"] # Compute LCS distance lcs_metric(seq_0, seq_1) # Compute LCS distance directly on sequence pool dm = lcs_metric.compute_matrix(simple_pool) dm.to_dataframe().head() Before computing the metric, you can customize its behavior using `update_settings()` or `kwargs`. # Preconfigure the behavior using update_settings lcs_metric.update_settings( as_distance=True, normalize=False, ) dm = lcs_metric.compute_matrix(simple_pool) dm.to_dataframe().head() # Modify behavior directly from kwargs dm = lcs_metric.compute_matrix( simple_pool, as_distance=True, normalize=False, ) dm.to_dataframe().head() ---------------------------------------- ## Linear Pairwise Sequence Metric """ Linear Pairwise Sequence Metric ================== Compute the linear pairwise distance between two sequences. """ ### Required imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) # Sequence Metrics from tanat.metric.sequence import ( LinearPairwiseSequenceMetric, LinearPairwiseSequenceMetricSettings, ) ## Data Setup Let's create a simple sequence data to demonstrate the linear pairwise metric. N_SEQ = 1000 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple sequences for clear metric demonstration simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool ### Linear Pairwise Metric Compares sequences element-by-element using an underlying entity metric (default: Hamming with default settings). # Init linear metric with default settings settings = LinearPairwiseSequenceMetricSettings() linear_metric = LinearPairwiseSequenceMetric(settings=settings) # -- Settings overview linear_metric # Access two simple sequences seq_0 = simple_pool["seq-0"] seq_1 = simple_pool["seq-1"] # Compute linear pairwise distance linear_metric(seq_0, seq_1) # Compute linear pairwise directly on sequence pool dm = linear_metric.compute_matrix(simple_pool) dm.to_dataframe().head() Before computing the metric, you can customize its behavior using `update_settings()` or `kwargs`. # Preconfigure the aggregation function using update_settings linear_metric.update_settings( agg_fun="sum", # Use sum aggregation instead of default mean ) dm = linear_metric.compute_matrix(simple_pool) dm.to_dataframe().head() # Provide the aggregation function directly as kwargs dm = linear_metric.compute_matrix( simple_pool, agg_fun="sum", # Use sum aggregation instead of default mean ) dm.to_dataframe().head() ---------------------------------------- ## Soft Dynamic Time Warping """ Soft Dynamic Time Warping ================== Compute the soft dynamic time warping distance (soft DTW) between two sequences. """ ### Required imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) # Sequence Metrics from tanat.metric.sequence import ( SoftDTWSequenceMetric, SoftDTWSequenceMetricSettings, ) ## Data Setup Let's create a simple sequence data to demonstrate the soft DTW metric. N_SEQ = 10 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple sequences for clear metric demonstration simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool ### Soft Dynamic Time Warping A differentiable version of DTW that provides smoother distance calculations. # Create Soft DTW metric settings = SoftDTWSequenceMetricSettings() soft_dtw_metric = SoftDTWSequenceMetric(settings=settings) # -- Settings overview soft_dtw_metric # Access two simple sequences seq_0 = simple_pool["seq-0"] seq_1 = simple_pool["seq-1"] # Compute edit distance soft_dtw_metric(seq_0, seq_1) # Compute soft DTW distance directly on sequence pool dm = soft_dtw_metric.compute_matrix(simple_pool) dm.to_dataframe().head() Before computing the metric, you can customize its behavior using `update_settings()` or `kwargs`. # Preconfigure gamma soft_dtw_metric.update_settings( gamma=0.5, # Reduce gamma ) dm = soft_dtw_metric.compute_matrix(simple_pool) dm.to_dataframe().head() # Modify gamma directly from kwargs dm = soft_dtw_metric.compute_matrix( simple_pool, gamma=0.5, # Reduce gamma ) dm.to_dataframe().head() ---------------------------------------- ## Chi-Squared Distance """ Chi-Squared Distance ==================== Compute the chi-squared distance between two sequences based on state distributions. """ ### Required imports from datetime import datetime # Data simulation from tanat.dataset.simulation.sequence import ( generate_state_sequences, ) # Sequence pools from tanat.sequence import ( StateSequencePool, ) # Sequence Metrics from tanat.metric.sequence import ( Chi2SequenceMetric, Chi2SequenceMetricSettings, ) ## Data Setup Let's create state sequences to demonstrate the Chi2 metric. Chi2 compares the proportion of time spent in each state. N_SEQ = 100 SIZE_DISTRIBUTION = [5, 6, 7, 8, 9, 10] SEED = 42 # Generate state sequences state_data = generate_state_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["healthy", "sick", "recovered"], missing_data=0.0, entity_feature="status", seed=SEED, ) print(state_data) state_settings = { "id_column": "id", "start_column": "start_date", "entity_features": ["status"], # Avoid warning for last state "default_end_value": datetime.now(), } state_pool = StateSequencePool(state_data, state_settings) state_pool ### Chi-Squared Distance Chi2 compares **state distributions** (ignoring temporal order). It measures how different the time spent in each state is between sequences. # Create Chi2 metric settings = Chi2SequenceMetricSettings( entity_features=["status"], ) chi2_metric = Chi2SequenceMetric(settings=settings) # -- Settings overview chi2_metric # Access two sequences seq_0 = state_pool["seq-0"] seq_1 = state_pool["seq-1"] # Compute Chi2 distance chi2_metric(seq_0, seq_1) ### Key difference from other metrics Chi2 does **not** use an `entity_metric`. It directly computes state distributions. - For **StateSequence**: uses actual durations - For **EventSequence**: each event counts as 1 unit # Compute Chi2 on the full pool dm = chi2_metric.compute_matrix(state_pool) dm.to_dataframe().head() ---------------------------------------- ## Create Custom Metric """ Create Custom Metric ================== Create a custom sequence metric. """ ### Required imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) ## Data Setup Let's create a simple sequence data to demonstrate the soft DTW metric. N_SEQ = 100 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple sequences for clear metric demonstration simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool ### Custom Sequence Metric Define a custom sequence metric that simply calculates the length difference between two sequences. **Minimal implementation**: Override `_compute_single_distance(seq_a, seq_b)` to define your metric logic. **Performance optimization**: For large pools, you can also override `_compute_distances(dm, sequence_pool)` to use vectorized operations or Numba JIT compilation (see DTW, LCS implementations for examples). # Create a custom sequence metric from pydantic.dataclasses import dataclass, Field from tanat.metric.sequence.base.metric import SequenceMetric from tanat.metric.matrix import MatrixStorageOptions @dataclass class SimpleLengthSettings: """Settings for the length metric. Note: We don't inherit from BaseSequenceMetricSettings since this metric doesn't use an entity_metric. We only declare distance_matrix which is required for compute_matrix() support. """ absolute: bool = True # If True, returns absolute value of the difference # Required for compute_matrix() support distance_matrix: MatrixStorageOptions = Field(default_factory=MatrixStorageOptions) class SimpleLengthMetric(SequenceMetric, register_name="length"): """Metric that simply calculates the length difference between two sequences.""" SETTINGS_DATACLASS = SimpleLengthSettings def __init__(self, settings=None): if settings is None: settings = SimpleLengthSettings() super().__init__(settings) def _compute_single_distance(self, seq_a, seq_b): """Calculate the length difference between two sequences.""" len_a = len(seq_a.sequence_data) len_b = len(seq_b.sequence_data) difference = len_a - len_b if self._settings.absolute: return abs(difference) return difference # Access two simple sequences seq_0 = simple_pool["seq-0"] seq_1 = simple_pool["seq-1"] # Test custom metric custom_metric = SimpleLengthMetric() custom_metric(seq_0, seq_1) dm = custom_metric.compute_matrix(simple_pool) dm.to_dataframe().head() ---------------------------------------- ## Aggregation Trajectory Metric """ Aggregation Trajectory Metric ================== Compute the aggregated distance between two trajectories. """ ### Required imports from datetime import datetime # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, generate_state_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, StateSequencePool, ) from tanat.trajectory import TrajectoryPool # Sequence Metrics from tanat.metric.trajectory import ( AggregationTrajectoryMetric, AggregationTrajectoryMetricSettings, ) ## Data Setup Let's create a simple sequence data to demonstrate the aggregation metric. N_SEQ = 1000 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple sequences for clear metric demonstration simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool # Generate another set of simple sequences (states) simple_data = generate_state_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["Z", "Y", "X", "W"], missing_data=0.0, entity_feature="states", seed=SEED, ) simple_settings = { "id_column": "id", "start_column": "start_date", "default_end_value": datetime.now(), # Avoid warning "entity_features": ["states"], } simple_pool_2 = StateSequencePool(simple_data, simple_settings) simple_pool_2 # Build trajectory pool trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(simple_pool, "events") trajectory_pool.add_sequence_pool(simple_pool_2, "states") # Configure settings trajectory_pool.update_settings(intersection=False) ### Aggregation Trajectory Metric Aggregation trajectory metric computes a distance between two trajectories. # Create aggregation metric settings = AggregationTrajectoryMetricSettings() mean_agg_metric = AggregationTrajectoryMetric(settings=settings) # -- Settings overview mean_agg_metric By default the aggregation metric computes linear pairwise distance between sequences before aggregating. Mean aggregation is used as default. # Access two simple trajectories traj_1 = trajectory_pool["seq-0"] traj_2 = trajectory_pool["seq-1"] # Compute aggregated distance mean_agg_metric(traj_1, traj_2) # Compute mean aggregation directly on trajectory pool dm = mean_agg_metric.compute_matrix(trajectory_pool) dm.to_dataframe().head() Before computing the metric, you can customize its behavior using `update_settings()` or `kwargs`. # Preconfigure the the behavior of the metric mean_agg_metric.update_settings( # Compute DTW distance before aggregating metric_mapper={"default_metric": "dtw"}, ) dm = mean_agg_metric.compute_matrix(trajectory_pool) dm.to_dataframe().head() # Modify the behavior directly from kwargs dm = mean_agg_metric.compute_matrix( trajectory_pool, # Compute DTW distance before aggregating metric_mapper={"default_metric": "dtw"}, ) dm.to_dataframe().head() ---------------------------------------- ## Create Custom Metric """ Create Custom Metric ================== Create a custom trajectory metric. """ ### Required imports from datetime import datetime # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, generate_state_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, StateSequencePool, ) from tanat.trajectory import TrajectoryPool ## Data Setup Let's create a simple sequence data to demonstrate the aggregation metric. N_SEQ = 100 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # Generate simple sequences for clear metric demonstration simple_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=SEED, ) simple_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } simple_pool = EventSequencePool(simple_data, simple_settings) simple_pool # Generate another set of simple sequences (states) simple_data = generate_state_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=["Z", "Y", "X", "W"], missing_data=0.0, entity_feature="states", seed=SEED, ) simple_settings = { "id_column": "id", "start_column": "start_date", "default_end_value": datetime.now(), # Avoid warning "entity_features": ["states"], } simple_pool_2 = StateSequencePool(simple_data, simple_settings) simple_pool_2 # Build trajectory pool trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(simple_pool, "events") trajectory_pool.add_sequence_pool(simple_pool_2, "states") # Configure settings trajectory_pool.update_settings(intersection=False) ### Custom Trajectory Metric Define a dummy trajectory metric that consistently returns a fixed distance value between two trajectories. **Minimal implementation**: Override `_compute_single_distance(traj_a, traj_b)` to define your metric logic. **Performance optimization**: For large pools, you can also override `_compute_distances(dm, trajectory_pool)` to use vectorized operations or parallel processing (see AggregationTrajectoryMetric for an example). # Create a custom trajectory metric from pydantic.dataclasses import dataclass, Field from tanat.metric.trajectory.base.metric import TrajectoryMetric from tanat.metric.matrix import MatrixStorageOptions @dataclass class DummySettings: """Settings for the dummy metric. Note: distance_matrix is required for compute_matrix() support. """ value: int = 42 # Distance value to return # Required for compute_matrix() support distance_matrix: MatrixStorageOptions = Field(default_factory=MatrixStorageOptions) class DummyTrajectoryMetric(TrajectoryMetric, register_name="dummy"): """Metric that computes a dummy distance between two trajectories.""" SETTINGS_DATACLASS = DummySettings def __init__(self, settings=None): if settings is None: settings = DummySettings() super().__init__(settings) def _compute_single_distance(self, traj_a, traj_b): """ Compute dummy distance between two trajectories. """ # always return the fixed value from settings return self.settings.value # Access two simple trajectories traj_1 = trajectory_pool["seq-0"] traj_2 = trajectory_pool["seq-1"] # Test custom metric custom_metric = DummyTrajectoryMetric() custom_metric(traj_1, traj_2) dm = custom_metric.compute_matrix(trajectory_pool) dm.to_dataframe().head() ---------------------------------------- ## Hierarchical Clustering """ Hierarchical Clustering ======================= Perform hierarchical clustering on temporal data. """ This example demonstrates how to perform hierarchical clustering on both sequence pools and trajectory pools. ### Required Imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) # Trajectory pools from tanat.trajectory import ( TrajectoryPool, ) # Clustering tools from tanat.clustering import ( HierarchicalClusterer, HierarchicalClustererSettings, ) ## 1. Data Initialization and Generation We generate simple event sequences to use as input for clustering. NUM_SEQUENCES = 1000 SEQUENCE_LENGTHS = [4, 5, 6, 7, 8, 9, 10, 11, 12] RANDOM_SEED = 42 # Generate synthetic event sequences event_data = generate_event_sequences( n_seq=NUM_SEQUENCES, seq_size=SEQUENCE_LENGTHS, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=RANDOM_SEED, ) # Define event sequence settings event_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } event_pool = EventSequencePool(event_data, event_settings) event_pool ## 2. Hierarchical Clustering on a Sequence Pool We cluster individual event sequences using a linear pairwise distance metric. # Initialize the clusterer with default settings hc_settings = HierarchicalClustererSettings( metric="linearpairwise", # Sequence-level metric cluster_column="hclass", # Column where cluster labels will be stored ) clusterer = HierarchicalClusterer(settings=hc_settings) # Show clusterer settings clusterer # Fit the clusterer on the sequence pool clusterer.fit(event_pool) # Show clustering summary clusterer ## 3. Hierarchical Clustering on a Trajectory Pool Clustering entire trajectories using a trajectory-level metric (e.g., aggregation). # Initialize and populate a trajectory pool trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(event_pool, "events") # Configure a new clusterer for trajectory clustering hc_settings = HierarchicalClustererSettings( metric="aggregation", # Trajectory-level distance metric cluster_column="hclass", # Cluster labels stored here ) clusterer = HierarchicalClusterer(settings=hc_settings) # Fit the clusterer on the trajectory pool clusterer.fit(trajectory_pool) # Summarize results clusterer # Access clustering results from the static data trajectory_pool.static_data.head() ---------------------------------------- ## PAM Clustering """ PAM Clustering ============== Perform PAM clustering on temporal data. """ This example demonstrates how to perform PAM (Partition Around Medoids) clustering on sequence pools. PAM is a robust clustering algorithm that selects actual data points as cluster centers (medoids). ### Required Imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) # Trajectory pools from tanat.trajectory import ( TrajectoryPool, ) # Clustering tools from tanat.clustering import ( PAMClusterer, PAMClustererSettings, ) ## 1. Data Initialization and Generation We generate synthetic event sequences to use as input for clustering. NUM_SEQUENCES = 1000 SEQUENCE_LENGTHS = [5, 6, 7, 8, 9, 10] RANDOM_SEED = 42 # Generate synthetic event sequences event_data = generate_event_sequences( n_seq=NUM_SEQUENCES, seq_size=SEQUENCE_LENGTHS, vocabulary=["A", "B", "C", "D", "E"], missing_data=0.0, entity_feature="event", seed=RANDOM_SEED, ) # Define event sequence settings event_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } event_pool = EventSequencePool(event_data, event_settings) event_pool ## 2. PAM Clustering with Default Settings PAM clustering minimizes the inertia by selecting actual sequences as medoids (cluster centers). .. important:: PAM precomputes the full distance matrix, which can be memory-intensive for large datasets. For datasets with thousands of sequences, consider using CLARA instead. # Initialize the PAM clusterer with 3 clusters pam_settings = PAMClustererSettings( metric="linearpairwise", # Sequence-level metric n_clusters=3, # Number of clusters to form max_iter=100, # Maximum iterations for optimization cluster_column="pam_cluster", # Column where cluster labels will be stored ) clusterer = PAMClusterer(settings=pam_settings) # Show clusterer settings clusterer ## 3. Fit the Model Apply PAM clustering to the sequence pool. # Fit the clusterer clusterer.fit(event_pool) # View cluster assignments event_pool.static_data ## 4. Access Medoids PAM selects actual sequences as medoids (representative sequences for each cluster). These medoids minimize the total distance to all sequences in their cluster. # Get the medoid sequences medoids = clusterer.medoids print(f"Medoid IDs: {medoids}") # View medoid sequences for medoid_id in medoids: print(f"\nMedoid {medoid_id}:") print(event_pool[medoid_id].sequence_data) ## 5. Custom Distance Metric You can use different distance metrics depending on your needs. # Using Edit distance (Levenshtein) metric pam_edit_settings = PAMClustererSettings( metric="edit", n_clusters=3, cluster_column="pam_edit_cluster", ) clusterer_edit = PAMClusterer(settings=pam_edit_settings) clusterer_edit.fit(event_pool) # Access clustering results from the static data event_pool.static_data ## 6. PAM Clustering on Trajectory Pools PAM also works on trajectory pools. You need to use a trajectory-level metric. # Initialize and populate a trajectory pool trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(event_pool, "events") # Configure PAM for trajectories with appropriate metric pam_traj_settings = PAMClustererSettings( metric="aggregation", # Trajectory-level distance metric n_clusters=3, cluster_column="pam_traj_cluster", ) clusterer_traj = PAMClusterer(settings=pam_traj_settings) clusterer_traj.fit(trajectory_pool) # View results trajectory_pool.static_data.head() ---------------------------------------- ## CLARA Clustering """ CLARA Clustering ================ Perform CLARA clustering on temporal data. """ This example demonstrates how to perform CLARA clustering on large sequence pools. CLARA (Clustering LARge Applications) is designed for large datasets and uses sampling to make PAM clustering scalable. ### Required Imports # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, ) # Trajectory pools from tanat.trajectory import ( TrajectoryPool, ) # Clustering tools from tanat.clustering import ( CLARAClusterer, CLARAClustererSettings, ) ## 1. Data Initialization and Generation For CLARA, we generate a larger dataset to demonstrate its scalability. CLARA is designed for datasets that would be too large for standard PAM. NUM_SEQUENCES = 1000 # Larger dataset SEQUENCE_LENGTHS = [5, 6, 7, 8, 9, 10] RANDOM_SEED = 42 # Generate synthetic event sequences event_data = generate_event_sequences( n_seq=NUM_SEQUENCES, seq_size=SEQUENCE_LENGTHS, vocabulary=["A", "B", "C", "D", "E"], missing_data=0.0, entity_feature="event", seed=RANDOM_SEED, ) # Define event sequence settings event_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } event_pool = EventSequencePool(event_data, event_settings) event_pool ## 2. CLARA Clustering with Default Settings CLARA (Clustering LARge Applications) divides the dataset into multiple samples, applies PAM to each sample, and selects the best set of medoids. .. note:: CLARA is particularly useful when your dataset is too large to compute the full distance matrix required by standard PAM. # Initialize the CLARA clusterer clara_settings = CLARAClustererSettings( metric="linearpairwise", # Sequence-level metric n_clusters=3, # Number of clusters to form sampling_ratio=0.4, # Use 40% of data for each PAM run nb_pam_instances=3, # Run 3 independent PAM instances max_iter=100, # Maximum iterations per PAM run cluster_column="clara_cluster", # Column where cluster labels will be stored ) clusterer = CLARAClusterer(settings=clara_settings) # Show clusterer settings clusterer ## 3. Understanding CLARA Parameters **Key parameters explained:** - ``sampling_ratio``: Fraction of data to sample for each PAM instance (default: 0.4) - ``nb_pam_instances``: Number of independent PAM runs with different samples (default: 5) - ``max_iter``: Maximum iterations for each PAM optimization ## 4. Fit the Model Apply CLARA clustering to the sequence pool. # Fit the clusterer clusterer.fit(event_pool) # View cluster assignments event_pool.static_data ## 5. Access Medoids Like PAM, CLARA selects actual sequences as medoids. The final medoids are chosen from the best PAM run. # Get the medoid sequences medoids = clusterer.medoids print(f"Medoid IDs: {medoids}") # View medoid sequences for medoid_id in medoids: print(f"\nMedoid {medoid_id}:") print(event_pool[medoid_id].sequence_data) ## 6. Tuning CLARA Parameters Adjust sampling and number of PAM instances for your dataset size. # More aggressive sampling for very large datasets clara_large_settings = CLARAClustererSettings( metric="edit", n_clusters=4, sampling_ratio=0.2, # Smaller samples nb_pam_instances=5, # More PAM runs for better coverage cluster_column="clara_large_cluster", ) clusterer_large = CLARAClusterer(settings=clara_large_settings) clusterer_large.fit(event_pool) # Access clustering results from the static data event_pool.static_data ## 7. CLARA Clustering on Trajectory Pools CLARA also works on trajectory pools. You need to use a trajectory-level metric. # Initialize and populate a trajectory pool trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(event_pool, "events") # Configure CLARA for trajectories with appropriate metric clara_traj_settings = CLARAClustererSettings( metric="aggregation", # Trajectory-level distance metric n_clusters=3, sampling_ratio=0.3, nb_pam_instances=3, cluster_column="clara_traj_cluster", ) clusterer_traj = CLARAClusterer(settings=clara_traj_settings) clusterer_traj.fit(trajectory_pool) # View results trajectory_pool.static_data.head() ---------------------------------------- ## Timeline Visualization """ Timeline Visualization ====================== Visualize sequences as time-aligned timelines. """ This example demonstrates how to visualize sequence data using timeline representations. Timeline visualizations show sequences over time with temporal alignment, making them ideal for analyzing event patterns, state durations, and temporal relationships. ### Required Imports from datetime import datetime # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, generate_state_sequences, generate_interval_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, StateSequencePool, IntervalSequencePool, EventSequenceSettings, StateSequenceSettings, IntervalSequenceSettings, ) # Timeline visualization from tanat.visualization.sequence import SequenceVisualizer ## 1. Event Sequence Timelines We'll start with event sequences to demonstrate basic timeline functionality. Event sequences show discrete events occurring at specific points in time. # Generate event sequences event_data = generate_event_sequences( n_seq=30, seq_size=[15, 20, 25, 30], vocabulary=["Login", "Purchase", "Logout", "Support"], missing_data=0.0, entity_feature="action", seed=42, ) # Create event sequence pool event_settings = EventSequenceSettings( id_column="id", time_column="date", entity_features=["action"], ) event_pool = EventSequencePool(event_data, event_settings) print(f"Event pool: {len(event_pool.unique_ids)} sequences") event_pool ## 2. Basic Timeline Visualization Timeline visualizations can render individual sequences or entire pools with different stacking and temporal alignment options. # Basic timeline with default settings # This shows a timeline by user actions. # fmt: off SequenceVisualizer.timeline() \ .title("User Action Timeline") \ .legend(show=True, title="Actions", loc="upper right") \ .draw(event_pool) # fmt: on # Single sequence timeline for detailed view single_sequence = event_pool["seq-0"] # fmt: off SequenceVisualizer.timeline() \ .title("Single User Journey") \ .colors("Set2") \ .marker(size=30, alpha=0.8) \ .legend(show=True, title="User Actions", loc="upper right") \ .y_axis(show=True, label="User ID") \ .draw(single_sequence) # fmt: on ## 3. Stacking Modes Different stacking modes organize multiple sequences in various ways. # Category stacking - groups similar sequences # Explicitly stacks sequences by their categories (e.g., actions). # fmt: off SequenceVisualizer.timeline(stacking_mode="by_category") \ .title("Timeline by Category Stacking") \ .colors("tab10") \ .legend(show=True, loc="center right") \ .draw(event_pool) # fmt: on # Flat stacking - each sequence on its own row # This shows each sequence as a separate row, useful for comparing patterns. # fmt: off SequenceVisualizer.timeline(stacking_mode="flat") \ .title("Flat Timeline - One Sequence per Row") \ .colors("Paired") \ .marker(spacing=0.8, size=8) \ .legend(show=True, loc="center right") \ .draw(event_pool) # fmt: on ## 4. Relative Time Alignment Relative time mode aligns all sequences to a common starting point for pattern comparison. # Relative time timeline # Aligns all sequences to start from the first event in each sequence. # fmt: off SequenceVisualizer.timeline( relative_time=True, granularity="day", stacking_mode="flat" ) \ .title("User Actions - Relative Timeline (Days)") \ .colors("Accent") \ .marker(size=10, alpha=0.7, spacing=0.6) \ .legend(show=True, title="Actions", loc="upper right") \ .x_axis(label="Days from Start") \ .draw(event_pool) # fmt: on ## 5. State Sequence Timelines State sequences show periods/durations rather than discrete events. # Generate state sequences state_data = generate_state_sequences( n_seq=25, seq_size=[10, 15, 20], vocabulary=["Active", "Inactive", "Maintenance", "Error"], missing_data=0.1, entity_feature="status", seed=42, ) # Create state sequence pool state_settings = StateSequenceSettings( id_column="id", start_column="start_date", entity_features=["status"], default_end_value=datetime.now(), ) state_pool = StateSequencePool(state_data, state_settings) print(f"State pool: {len(state_pool.unique_ids)} sequences") state_pool # State timeline visualization # This shows a timeline of system states over time. # fmt: off SequenceVisualizer.timeline(stacking_mode="flat") \ .title("System Status Timeline") \ .colors("Set1") \ .marker(spacing=0.9) \ .legend(show=True, title="System Status", loc="center left") \ .x_axis(label="Time") \ .draw(state_pool) # fmt: on ## 6. Interval Sequence Timelines Interval sequences have both start and end times, showing duration explicitly. # Generate interval sequences interval_data = generate_interval_sequences( n_seq=30, seq_size=[8, 12, 15], vocabulary=["Meeting", "Break", "Work", "Travel"], missing_data=0.05, entity_feature="activity", seed=42, ) # Create interval sequence pool interval_settings = IntervalSequenceSettings( id_column="id", start_column="start_date", end_column="end_date", entity_features=["activity"], ) interval_pool = IntervalSequencePool(interval_data, interval_settings) print(f"Interval pool: {len(interval_pool.unique_ids)} sequences") interval_pool # Interval timeline with temporal alignment # # Align all sequences to start from the 7th interval (0-based indexing) # This sets the 7th interval as the reference point (T=0) for all sequences interval_pool.zero_from_position(7) # Set 7th interval as temporal baseline # fmt: off SequenceVisualizer.timeline( relative_time=True, granularity="hour", stacking_mode="flat" ) \ .title("Daily Activity Timeline (Hours)") \ .colors("Set2") \ .marker(spacing=0.9, alpha=0.8) \ .legend(show=True, title="Activities", loc="upper right") \ .x_axis(label="Hours from Start") \ .draw(interval_pool) # fmt: on ## 7. Advanced Customization Timeline visualizations support extensive customization of markers, colors, and themes. # Custom color mapping for specific categories custom_colors = { "Login": "#2E8B57", # Sea Green "Purchase": "#FF6347", # Tomato "Logout": "#4682B4", # Steel Blue "Support": "#DAA520", # Golden Rod } # fmt: off SequenceVisualizer.timeline(relative_time=True) \ .colors(custom_colors) \ .title("Custom Colored User Timeline") \ .marker( size=14, shape="D", # Diamond edge_color="black", alpha=0.9, spacing=0.5 ) \ .legend(show=True, title="User Actions", loc="upper right") \ .draw(event_pool) # fmt: on ## 8. Theme Applications Apply different themes for various presentation contexts. # Dark theme timeline # This shows a timeline with a dark background. # fmt: off SequenceVisualizer.timeline( stacking_mode="flat", relative_time=True ) \ .colors("tab20") \ .title("Timeline - Dark Theme") \ .marker(size=10, alpha=0.9) \ .legend(show=True, title="Actions", loc="upper right") \ .set_theme("dark_background") \ .draw(event_pool) # fmt: on ## 9. Viewing Settings and Debugging Inspect current settings for troubleshooting and understanding configurations. # View current timeline settings timeline_viz = SequenceVisualizer.timeline() timeline_viz.view_settings() # Examine the prepared data structure data_preview = timeline_viz.prepare_data(event_pool) print("Prepared data sample:") data_preview.head() ## 10. Saving Timeline Visualizations Export timelines with custom resolution and file formats. # High-resolution timeline export # This saves the timeline visualization to a PNG file with 300 DPI. # fmt: off SequenceVisualizer.timeline( relative_time=True, stacking_mode="flat" ) \ .title("User Journey Analysis - Final") \ .colors("Set3") \ .marker(size=8, alpha=0.8) \ .legend(show=True, title="User Actions", loc="best") \ .x_axis(label="Timeline (Days)") \ .draw(event_pool) \ .save("user_timeline_analysis.png", dpi=300) # fmt: on ---------------------------------------- ## Histogram Visualization """ Histogram Visualization ======================= Aggregate sequence values into time-based histograms. """ This example demonstrates how to visualize sequence data using histogram representations. Histogram visualizations show frequency or duration distributions for sequence elements, making them ideal for analyzing occurrence patterns, time spent in states, or event frequencies. ### Required Imports from datetime import datetime # Data simulation from tanat.dataset.simulation.sequence import ( generate_event_sequences, generate_state_sequences, generate_interval_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, StateSequencePool, IntervalSequencePool, EventSequenceSettings, StateSequenceSettings, IntervalSequenceSettings, ) # Histogram visualization from tanat.visualization.sequence import SequenceVisualizer ## 1. Event Sequence Histograms We'll start with event sequences to show frequency distributions of discrete events. # Generate event sequences for web user behavior event_data = generate_event_sequences( n_seq=100, seq_size=[10, 15, 20, 25], vocabulary=["PageView", "Click", "Purchase", "Search", "Login", "Logout"], missing_data=0.05, entity_feature="action", seed=42, ) # Create event sequence pool event_settings = EventSequenceSettings( id_column="id", time_column="date", entity_features=["action"], ) event_pool = EventSequencePool(event_data, event_settings) print(f"Event pool: {len(event_pool.unique_ids)} user sessions") event_pool ## 2. Basic Frequency Histograms Frequency histograms show how often each event type occurs across all sequences. # Basic occurrence histogram # Basic histogram showing event counts # fmt: off SequenceVisualizer.histogram() \ .title("User Action Frequency Distribution") \ .colors("Set2") \ .legend(show=True, title="Actions", loc="upper right") \ .x_axis(label="Action Types") \ .y_axis(label="Frequency") \ .draw(event_pool) # fmt: on # Single sequence histogram for comparison single_session = event_pool["seq-0"] # fmt: off SequenceVisualizer.histogram() \ .title("Single User Session - Action Frequency") \ .colors("tab10") \ .legend(show=True, loc="upper right") \ .draw(single_session) # fmt: on ## 3. Different Display Modes Histograms can show occurrence counts, frequency rates, or time spent. # Occurrence count (default mode) # Histogram showing raw counts of each action # fmt: off SequenceVisualizer.histogram(show_as="occurrence") \ .title("Action Occurrence Counts") \ .colors("tab10") \ .legend(show=True, title="User Actions", loc="upper right") \ .draw(event_pool) # fmt: on # Frequency mode (normalized counts) # Histogram showing relative frequency of each action # fmt: off SequenceVisualizer.histogram(show_as="frequency") \ .title("Action Frequency Distribution") \ .colors("Accent") \ .legend(show=True, title="Actions", loc="upper right") \ .y_axis(label="Relative Frequency") \ .draw(event_pool) # fmt: on ## 4. Bar Ordering Options Control the order of bars for better visualization of patterns. # Descending order - most frequent first # Histogram showing actions ordered by frequency # fmt: off SequenceVisualizer.histogram( show_as="occurrence", bar_order="descending" ) \ .title("Most Frequent Actions First") \ .colors("Dark2") \ .legend(show=True, loc="upper right") \ .draw(event_pool) # fmt: on # Ascending order - least frequent first # Histogram showing actions ordered by frequency, least to most # fmt: off SequenceVisualizer.histogram( show_as="occurrence", bar_order="ascending" ) \ .title("Least Frequent Actions First") \ .colors("Paired") \ .legend(show=True, loc="lower right") \ .draw(event_pool) # fmt: on ## 5. Orientation Options Horizontal bars can be more readable for certain data types. # Horizontal histogram - useful for long category names # Histogram showing actions in horizontal layout # fmt: off SequenceVisualizer.histogram( show_as="frequency", bar_order="descending", orientation="horizontal" ) \ .title("User Actions - Horizontal View") \ .colors("Set1") \ .legend(show=True, title="Actions", loc="upper right") \ .x_axis(label="Frequency") \ .y_axis(label="Action Types") \ .draw(event_pool) # fmt: on ## 6. State Sequence Histograms - Time Spent Analysis For state sequences, we can analyze time spent in different states. # Generate state sequences for system monitoring state_data = generate_state_sequences( n_seq=50, seq_size=[15, 20, 25], vocabulary=["Running", "Idle", "Maintenance", "Error", "Shutdown"], missing_data=0.1, entity_feature="system_status", seed=42, ) # Create state sequence pool state_settings = StateSequenceSettings( id_column="id", start_column="start_date", default_end_value=datetime.now(), # Avoid warning entity_features=["system_status"], ) state_pool = StateSequencePool(state_data, state_settings) # Set granularity for time calculations state_pool.to_relative_time(granularity="hour") print(f"State pool: {len(state_pool.unique_ids)} system monitoring sequences") state_pool # Time spent histogram - shows duration in each state # Histogram showing total time spent in each state # fmt: off SequenceVisualizer.histogram( show_as="time_spent", bar_order="descending", granularity="hour" ) \ .title("System Time Spent Analysis (Hours)") \ .colors("coolwarm") \ .legend(show=True, title="System Status", loc="upper right") \ .x_axis(label="System States") \ .y_axis(label="Total Hours") \ .draw(state_pool) # fmt: on # Horizontal time spent view for better readability # Histogram showing time spent in each state, horizontal layout # fmt: off SequenceVisualizer.histogram( show_as="time_spent", bar_order="descending", orientation="horizontal", granularity="hour" ) \ .title("System Uptime Analysis") \ .colors("RdYlBu") \ .legend(show=True, title="Status", loc="upper right") \ .x_axis(label="Hours") \ .y_axis(label="System States") \ .draw(state_pool) # fmt: on ## 7. Interval Sequence Histograms Interval sequences can show both occurrence and duration distributions. # Generate interval sequences for activity tracking interval_data = generate_interval_sequences( n_seq=40, seq_size=[8, 12, 16], vocabulary=["Meeting", "Email", "Development", "Break", "Planning"], missing_data=0.05, entity_feature="work_activity", seed=42, ) # Create interval sequence pool interval_settings = IntervalSequenceSettings( id_column="id", start_column="start_date", end_column="end_date", entity_features=["work_activity"], ) interval_pool = IntervalSequencePool(interval_data, interval_settings) print(f"Interval pool: {len(interval_pool.unique_ids)} work activity logs") interval_pool # Occurrence histogram for intervals # Histogram showing count of each work activity # fmt: off SequenceVisualizer.histogram( show_as="occurrence", bar_order="descending" ) \ .title("Work Activity Frequency") \ .colors("tab20") \ .legend(show=True, title="Activities", loc="upper right") \ .x_axis(label="Activity Types") \ .y_axis(label="Number of Sessions") \ .draw(interval_pool) # fmt: on # Time spent in different activities # Histogram showing total time spent in each work activity # fmt: off SequenceVisualizer.histogram( show_as="time_spent", bar_order="descending", orientation="horizontal" ) \ .title("Time Allocation by Activity") \ .colors("Spectral") \ .legend(show=True, title="Work Activities", loc="upper right") \ .x_axis(label="Total Time") \ .y_axis(label="Activity Types") \ .draw(interval_pool) # fmt: on ## 8. Custom Color Mappings Apply specific colors for meaningful categorical representation. # Define custom colors for work activities activity_colors = { "Meeting": "#FF6B6B", # Red - Meetings "Email": "#4ECDC4", # Teal - Email "Development": "#45B7D1", # Blue - Coding "Break": "#96CEB4", # Green - Breaks "Planning": "#FECA57", # Yellow - Planning } # fmt: off SequenceVisualizer.histogram( show_as="time_spent", bar_order="descending" ) \ .colors(activity_colors) \ .title("Work Time Distribution - Custom Colors") \ .legend(show=True, title="Activities", loc="upper right") \ .x_axis(label="Activities") \ .y_axis(label="Time Spent") \ .draw(interval_pool) # fmt: on ## 9. Theming and Advanced Styling Apply different themes and advanced styling options. # Dark theme histogram # Histogram showing activity distribution with dark theme # fmt: off SequenceVisualizer.histogram( show_as="occurrence", bar_order="descending" ) \ .colors("tab20") \ .title("Activity Distribution - Dark Theme") \ .legend(show=True, title="Activities", loc="upper right") \ .set_theme("dark_background") \ .draw(interval_pool) # fmt: on # Custom marker styling (for histogram bars) # Histogram showing styled bars with custom marker settings # fmt: off SequenceVisualizer.histogram( show_as="frequency", bar_order="descending" ) \ .colors("Set3") \ .title("Styled Frequency Distribution") \ .marker(alpha=0.8) \ .legend(show=True, title="Actions", loc="upper right") \ .draw(event_pool) # fmt: on ## 10. Settings Inspection and Debugging View current settings and examine data preparation. # Create histogram visualizer and inspect settings histogram_viz = SequenceVisualizer.histogram() histogram_viz.view_settings() # Examine prepared data structure prepared_data = histogram_viz.prepare_data(event_pool) print("Histogram data structure:") prepared_data.head(10) ## 11. Saving Histogram Visualizations Export histograms with various formats and resolutions. # Save high-resolution histogram # Histogram showing work activity time analysis, saved as PNG # fmt: off SequenceVisualizer.histogram( show_as="time_spent", bar_order="descending", orientation="horizontal" ) \ .colors("Set1") \ .title("Work Activity Time Analysis - Final Report") \ .legend(show=True, title="Activities", loc="upper right") \ .x_axis(label="Time Spent (Hours)") \ .y_axis(label="Activity Types") \ .draw(interval_pool) \ .save("work_activity_histogram.png", dpi=300) # fmt: on ---------------------------------------- ## Distribution Visualization """ Distribution Visualization ========================== Show state proportions over time. """ This example demonstrates how to visualize sequence data using distribution plots. Distribution visualization shows the proportion of different states over time, inspired by the State Sequence Distribution plots from TraMineR. **Important Note**: Distribution visualizations are specifically designed for **STATE SEQUENCES ONLY**. They analyze how states are distributed across time periods and are not applicable to event or interval sequences. ### Required Imports from datetime import datetime # Data access and simulation from tanat.dataset import access from tanat.dataset.simulation.sequence import generate_state_sequences # Sequence pool from tanat.sequence import StateSequencePool, StateSequenceSettings # Distribution visualization from tanat.visualization.sequence import SequenceVisualizer ## 1. Real-World Data: MVAD Dataset We'll start with the MVAD (Multichannel Visualization and Analysis of Data) dataset, a well-known dataset in sequence analysis that shows transition from school to work for young adults. This creates a "classic" state distribution visualization. **Key Point**: MVAD is a state sequence dataset - perfect for distribution analysis! # Load the MVAD dataset mvad_data = access("mvad") print("MVAD dataset overview:") print(mvad_data.head(10)) print(f"Dataset shape: {mvad_data.shape}") print(f"Unique states: {mvad_data['value'].unique()}") # Initialize MVAD as a state sequence pool mvad_settings = StateSequenceSettings( id_column="id", start_column="start", end_column="end", entity_features=["value"], ) mvad_pool = StateSequencePool(sequence_data=mvad_data, settings=mvad_settings) print(f"Number of sequences: {len(mvad_pool.unique_ids)}") mvad_pool ## 2. Creating "classic" State Distribution Plots The distribution visualization creates a stacked area plot showing the proportion of each state over time - this is the "classic" state distribution plot commonly used in sequence analysis. **This visualization answers**: "At each time point, what percentage of individuals are in each state?" # Basic MVAD distribution plot # This shows the proportion of each state over time # fmt: off SequenceVisualizer.distribution(granularity="day") \ .colors("Set1") \ .title("MVAD State Distribution Over Time") \ .legend(show=True, loc="upper right") \ .x_axis(label="Time") \ .y_axis(label="State Proportion") \ .draw(mvad_pool) # fmt: on ## 3. Distribution Types Distribution visualizations can show different types of aggregations: percentage, count, or proportion. # Count distribution instead of percentage # This shows the absolute count of individuals in each state over time # fmt: off SequenceVisualizer.distribution( distribution_type="count", granularity="day" ) \ .colors("Set1") \ .title("MVAD State Counts Over Time") \ .legend(show=True, title="States") \ .draw(mvad_pool) # fmt: on # Proportion distribution (0-1 scale instead of 0-100) # This shows the proportion of individuals in each state at each time point # fmt: off SequenceVisualizer.distribution( distribution_type="proportion", granularity="day" ) \ .colors("Paired") \ .title("MVAD State Proportions (0-1 scale)") \ .legend(show=True) \ .draw(mvad_pool) # fmt: on ## 4. Unstacked Distributions For better comparison between states, we can create unstacked line plots. # Unstacked distribution - separate lines for each state # This shows each state as a separate line, useful for comparing trends # fmt: off SequenceVisualizer.distribution( distribution_type="percentage", stacked=False, granularity="day" ) \ .colors("tab10") \ .title("MVAD State Evolution (Unstacked)") \ .legend(show=True, title="States", loc="center right") \ .marker(alpha=0.7) \ .draw(mvad_pool) # fmt: on ## 5. Working with Simulated State Data Let's create some simulated state sequences to show other distribution features. **Remember**: Only state sequences work with distribution visualization! # Generate synthetic STATE sequences for additional examples synthetic_data = generate_state_sequences( n_seq=50, seq_size=[20, 25, 30], vocabulary=["Active", "Inactive", "Pending", "Completed"], missing_data=0.0, entity_feature="status", seed=42, ) # Create synthetic STATE sequence pool synthetic_settings = StateSequenceSettings( id_column="id", start_column="start_date", default_end_value=datetime.now(), entity_features=["status"], ) synthetic_pool = StateSequencePool(synthetic_data, synthetic_settings) synthetic_pool ## 6. Relative Time Distributions We can align sequences to a common starting point for pattern comparison. # Convert synthetic data to relative time synthetic_pool.to_relative_time(granularity="day") # Relative time distribution # fmt: off SequenceVisualizer.distribution( distribution_type="percentage", relative_time=True, granularity="day" ) \ .colors("Accent") \ .title("State Distribution with Relative Time") \ .legend(show=True, title="Status") \ .x_axis(label="Days from Start") \ .draw(synthetic_pool) # fmt: on ## 7. Theming and Customization TanaT allows changing the visual appearance using themes and custom styling. # Dark theme distribution # This shows the state distribution with a dark background # fmt: off SequenceVisualizer.distribution( distribution_type="percentage", granularity="day" ) \ .colors("tab20") \ .title("Dark Theme Distribution") \ .legend(show=True, title="States") \ .set_theme("dark_background") \ .draw(synthetic_pool) # fmt: on ## 8. Working with Individual State Sequences Distribution visualization can also be applied to single state sequences (though it's more meaningful for pools). # Single sequence distribution (less common but possible) single_sequence = synthetic_pool["seq-0"] # fmt: off SequenceVisualizer.distribution(granularity="day") \ .title("Single State Sequence Distribution") \ .colors("Set2") \ .legend(show=True) \ .draw(single_sequence) # fmt: on ## 9. Why Only State Sequences? Distribution visualizations are specifically designed for state sequences because: * **States have duration**: They occupy time periods, making "proportion at time t" meaningful * **Mutual exclusivity**: An individual can only be in one state at any given time * **Continuous coverage**: State sequences typically cover the entire observation period * **Meaningful aggregation**: Summing proportions across states gives 100% at each time point **Event sequences** show discrete occurrences and don't have inherent durations for proportion calculation. **Interval sequences** could theoretically work - a work in progress. ## 10. Viewing Current Settings You can inspect the current visualization settings before drawing. # Create visualizer and view settings dist_viz = SequenceVisualizer.distribution() dist_viz.view_settings() ## 11. Saving Visualizations Visualizations can be saved to disk with custom resolution and file formats. # Save with high resolution # This saves the state distribution analysis as a PNG file # fmt: off SequenceVisualizer.distribution( distribution_type="percentage", granularity="day" ) \ .colors("Paired") \ .title("State Distribution Analysis") \ .legend(show=True, title="States", loc="upper right") \ .draw(synthetic_pool) \ .save("state_distribution_analysis.png", dpi=300) # fmt: on ---------------------------------------- ## Coxnet Survival Analysis """ Coxnet Survival Analysis ======================== This example shows how to apply Coxnet-based survival analysis on temporal sequences using TanaT. """ ### Required Imports import datetime import random import pandas as pd import numpy as np import matplotlib.pyplot as plt # Data simulation from tanat.dataset.simulation.sequence import generate_event_sequences # Sequence pool from tanat.sequence import EventSequencePool # Survival analysis from tanat.survival import SurvivalAnalysis ## 1. Data Setup We generate a set of event sequences and link them to static patient data (e.g., gender, age group, smoking status). NUM_SEQUENCES = 100 SEQUENCE_LENGTHS = [4, 5, 6, 7, 8, 9, 10, 11, 12] RANDOM_SEED = 42 # Generate synthetic event sequences event_data = generate_event_sequences( n_seq=NUM_SEQUENCES, seq_size=SEQUENCE_LENGTHS, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=RANDOM_SEED, ) # Define sequence settings sequence_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } event_pool = EventSequencePool(event_data, sequence_settings) # Generate static features for each sequence (patient metadata) patient_ids = list(event_pool.unique_ids) static_data = pd.DataFrame( { "id": patient_ids, "gender": np.random.choice(["F", "M"], size=len(patient_ids)), "Age_Group": np.random.choice( ["40-49", "50-59", "60-69", "70-79"], size=len(patient_ids) ), "Smoker": np.random.choice([True, False], size=len(patient_ids)), } ) # Attach static features to the event pool event_pool.add_static_features(static_data) ## 2. Coxnet Survival Analysis We’ll now train a Coxnet model to predict survival probabilities, using a specific event (e.g., `'A'`) as the endpoint. # Initialize the survival analysis model surv = SurvivalAnalysis("coxnet") surv # Define the starting point (T0) for all sequences # 1 year ago event_pool.t_zero = datetime.datetime.now() - datetime.timedelta(days=365) # Split the data into training and testing sets all_ids = list(event_pool.unique_ids) train_ids = set(random.sample(all_ids, int(0.8 * len(all_ids)))) test_ids = set(all_ids) - train_ids train_pool = event_pool.subset(train_ids) test_pool = event_pool.subset(test_ids) # Extract survival times for patients (e.g., time to first event 'A') surv_res = surv.get_survival_array( sequence_pool=train_pool, query="event == 'A'", ) surv # Train the model using the training data fit_results = surv.fit(sequence_pool=train_pool, query="event == 'A'") # Predict survival functions for test patients survival_predictions = surv.predict_survival_function(sequence_or_pool=test_pool) # Plot predicted survival functions plt.figure(figsize=(12, 8)) for i, sf in enumerate(survival_predictions): plt.step(sf.x, sf.y, where="post", label=f"Patient {i+1}") plt.title("Predicted Survival Functions") plt.xlabel("Time (days)") plt.ylabel("Survival Probability") plt.grid(True) plt.legend(loc="best") plt.show() ---------------------------------------- ## Tree Survival Analysis """ Tree Survival Analysis ====================== This example shows how to apply a tree-based survival analysis model on temporal sequences using TanaT. """ ### Required Imports import datetime import random import pandas as pd import numpy as np import matplotlib.pyplot as plt # Data simulation from tanat.dataset.simulation.sequence import generate_event_sequences # Sequence pool from tanat.sequence import EventSequencePool # Survival analysis from tanat.survival import SurvivalAnalysis ## 1. Data Setup We generate a set of synthetic event sequences and add static attributes (e.g., gender, age, smoking status) for each patient. NUM_SEQUENCES = 100 SEQUENCE_LENGTHS = [4, 5, 6, 7, 8, 9, 10, 11, 12] RANDOM_SEED = 42 # Generate synthetic event sequences event_data = generate_event_sequences( n_seq=NUM_SEQUENCES, seq_size=SEQUENCE_LENGTHS, vocabulary=["A", "B", "C", "D"], missing_data=0.0, entity_feature="event", seed=RANDOM_SEED, ) # Define sequence settings sequence_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event"], } event_pool = EventSequencePool(event_data, sequence_settings) # Create static features (demographics) patient_ids = list(event_pool.unique_ids) static_data = pd.DataFrame( { "id": patient_ids, "gender": np.random.choice(["F", "M"], size=len(patient_ids)), "Age_Group": np.random.choice( ["40-49", "50-59", "60-69", "70-79"], size=len(patient_ids) ), "Smoker": np.random.choice([True, False], size=len(patient_ids)), } ) # Add static features to the pool event_pool.add_static_features(static_data) ## 2. Tree-Based Survival Analysis We train a tree-based survival model using a specified event (e.g., `'A'`) as the failure condition. # Initialize the model with tree-based backend surv = SurvivalAnalysis("tree") surv # Define baseline time (T0) for survival computation # 1 year ago event_pool.t_zero = datetime.datetime.now() - datetime.timedelta(days=365) # Split data into training and test sets all_ids = list(event_pool.unique_ids) train_ids = set(random.sample(all_ids, int(0.8 * len(all_ids)))) test_ids = set(all_ids) - train_ids train_pool = event_pool.subset(train_ids) test_pool = event_pool.subset(test_ids) # Compute survival labels: time to first occurrence of event 'A' surv_res = surv.get_survival_array(sequence_pool=train_pool, query="event == 'A'") surv # Train the model fit_results = surv.fit(sequence_pool=train_pool, query="event == 'A'") # Predict survival functions on new patients survival_predictions = surv.predict_survival_function(sequence_or_pool=test_pool) # Plot survival functions plt.figure(figsize=(12, 8)) for i, sf in enumerate(survival_predictions): plt.step(sf.x, sf.y, where="post", label=f"Patient {i+1}") plt.title("Predicted Survival Functions") plt.xlabel("Time (days)") plt.ylabel("Survival Probability") plt.grid(True) plt.legend(loc="best") plt.show() ---------------------------------------- ## Sequence Simulation # Sequence Simulation This notebook demonstrates how to simulate synthetic sequences using *TanaT*. We'll explore three types of sequences: event sequences (point-in-time occurrences), state sequences (persistent conditions), and interval sequences (activities with durations). These simulation tools are essential for: - Testing sequence analysis algorithms - Generating synthetic data for research - Understanding the impact of sequence characteristics on analysis outcomes - Creating controlled experiments with known ground truth ### Required imports from datetime import datetime import numpy as np import matplotlib.pyplot as plt from sklearn.metrics import adjusted_rand_score # Simulation imports from tanat.dataset.simulation.sequence import ( SequencePoolMocker, Profile, StateTimeDesign, EventTimeDesign, GenMethod, TimeStrategy, generate_event_sequences, generate_state_sequences, generate_interval_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, StateSequencePool, IntervalSequencePool, ) # Clustering for evaluation from tanat.clustering import ( HierarchicalClustererSettings, HierarchicalClusterer, ) ## 1. Basic Sequence Generation *TanaT* provides simple functions to generate synthetic sequences for testing and experimentation. # Global settings N_SEQ = 1000 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 ### Event Sequences Event sequences represent point-in-time occurrences, such as medical visits or biomarker measurements. # Generate event sequences representing medical visits event_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=[ "GENERAL_PRACTITIONER", "SPECIALIST", "RADIOLOGIST", "EMERGENCY", ], missing_data=0.1, entity_feature="event_type", seed=SEED, ) print("Event sequence data:") event_data.head(10) ### State Sequences State sequences represent conditions that persist over time, such as health states or treatment phases. # Generate state sequences representing health conditions state_data = generate_state_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=[ "HEALTHY", "TREATMENT", "CONVALESCENCE", "CHRONIC_MONITORING", "REMISSION", ], missing_data=0.1, entity_feature="health_state", seed=SEED, ) print("State sequence data:") state_data.head(10) ### Interval Sequences Interval sequences represent activities with defined start and end times, such as medication treatments or procedures. # Generate interval sequences representing medication treatments interval_data = generate_interval_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=[ "ANTIBIOTIC", "PAIN_RELIEVER", "CORTICOSTEROID", "ANTICOAGULANT", "ANTIHYPERTENSIVE", ], missing_data=0.1, entity_feature="medication", seed=SEED, ) print("Interval sequence data:") interval_data.head(10) ## 2. Creating Sequence Pools Sequence pools organize and manage collections of sequences, providing methods for analysis and manipulation. # Create sequence pools from the generated data event_settings = { "id_column": "id", "time_column": "date", "entity_features": ["event_type"], } state_settings = { "id_column": "id", "start_column": "start_date", "entity_features": ["health_state"], "default_end_value": datetime.now(), } interval_settings = { "id_column": "id", "start_column": "start_date", "end_column": "end_date", "entity_features": ["medication"], } # Initialize sequence pools event_pool = EventSequencePool(event_data, event_settings) state_pool = StateSequencePool(state_data, state_settings) interval_pool = IntervalSequencePool(interval_data, interval_settings) print(f"Event pool: {len(event_pool)} sequences") print(f"State pool: {len(state_pool)} sequences") print(f"Interval pool: {len(interval_pool)} sequences") ## 3. Advanced Simulation with SequencePoolMocker For more complex simulations, *TanaT* provides the SequencePoolMocker class, which allows creating distinct patient profiles with different sequence characteristics. ### Simulating Patient Profiles We'll create two distinct patient groups with different health state patterns to demonstrate clustering capabilities. # Create simulation engine for state sequences mocker = SequencePoolMocker("state", seed=SEED) # Group A: Acute illness and recovery pattern states_A = [ "HEALTHY", "SICK", "TREATMENT", "RECOVERY", "CONVALESCENCE", "FOLLOW_UP", "DISCHARGED", ] gen_A = GenMethod.init("random") gen_A.update_settings(vocabulary=states_A) profile_A = Profile( n_seq=N_SEQ, sequence_size=SIZE_DISTRIBUTION, entity_features={"state": gen_A}, profile_id="Acute_Recovery", ) # Group B: Chronic relapsing condition pattern states_B = [ "SICK", "RELAPSE", "TREATMENT", "STABLE", "REMISSION", "FLARE_UP", "MAINTENANCE", ] gen_B = GenMethod.init("random") gen_B.update_settings(vocabulary=states_B) profile_B = Profile( n_seq=N_SEQ, sequence_size=SIZE_DISTRIBUTION, entity_features={"state": gen_B}, profile_id="Chronic_Relapsing", ) # Add profiles to simulator mocker.add_profile(profile_A) mocker.add_profile(profile_B) ### Configuring Time Strategies Time strategies define how temporal aspects of sequences are generated. # Configure time strategies t0_strat = TimeStrategy.init("fixed") t0_strat.update_settings(t0_date="2020-01-01") sampling_strat = TimeStrategy.init("sequence_specific") sampling_strat.update_settings( distribution="uniform", min_date="2020-01-01", max_date="2023-01-01" ) mocker.set_time_design( StateTimeDesign(t0_strategy=t0_strat, sampling_strategy=sampling_strat) ) # Generate the sequence pool simulated_pool = mocker() print( f"Generated pool with {len(simulated_pool)} sequences ({N_SEQ} profile A, {N_SEQ} profile B)" ) # update default end value simulated_pool.update_settings(default_end_value=datetime.now()) print(simulated_pool.statistics) ## 4. Prepare a futur analysis using simulation Simulation can help evaluate the most suitable analysis techniques and guide experimental design before conducting real-world tests. ### Impact of Sequence Length on Clustering Let's examine how sequence length affects the ability to distinguish between different profiles. def compute_clustering_accuracy(pool): """Compute Adjusted Rand Index for clustering accuracy.""" df = pool.static_data[["_PROFILE_ID_", "hclusters"]].copy() true_labels = ( df["_PROFILE_ID_"].map({"Acute_Recovery": 0, "Chronic_Relapsing": 1}).values ) pred_labels = df["hclusters"].values return adjusted_rand_score(true_labels, pred_labels) # Test different sequence lengths sequence_lengths = [3, 5, 8, 12, 15, 18, 20, 23, 25, 30] accuracy_scores = [] for length in sequence_lengths: # Update profile settings profile_A.sequence_size = length profile_B.sequence_size = length # Generate new pool test_pool = mocker(profiles=[profile_A, profile_B]) # update default end value to avoid warning test_pool.update_settings(default_end_value=datetime.now()) # Cluster and evaluate clusterer = HierarchicalClusterer( HierarchicalClustererSettings( metric="dtw", n_clusters=2, cluster_column="hclusters", ) ) clusterer.fit(test_pool) accuracy = compute_clustering_accuracy(test_pool) accuracy_scores.append(accuracy) print(f"Length {length}: ARI = {accuracy:.3f}") Let's visualize the clustering accuracy as a function of sequence length. # Plot the results plt.figure(figsize=(10, 6)) plt.plot(sequence_lengths, accuracy_scores, marker="o", linewidth=2, markersize=8) plt.title("Impact of Sequence Length on Clustering Accuracy", fontsize=14) plt.xlabel("Sequence Length", fontsize=12) plt.ylabel("Adjusted Rand Index", fontsize=12) plt.ylim(-0.1, 1.05) plt.grid(True, alpha=0.3) plt.tight_layout() plt.show() An upward trend in Adjusted Rand Index (ARI) is observed as the sequence length increases. ARI quantifies the similarity between clustering results and ground truth, correcting for random agreement. This suggests that longer sequences provide more information, making it easier to distinguish between the two simulated profiles. ## 3. Biomarker Event Simulation Let's simulate biomarker measurements as event sequences with continuous values. # Create biomarker simulation biomarker_mocker = SequencePoolMocker("event", seed=42) # Biomarker A: Normal range values biomarker_a_gen = GenMethod.init("random") biomarker_a_gen.update_settings(vocabulary=np.random.uniform(0, 1, 50)) # Biomarker B: Elevated values biomarker_b_gen = GenMethod.init("random") biomarker_b_gen.update_settings(vocabulary=np.random.uniform(2, 8, 50)) biomarker_profile = Profile( n_seq=N_SEQ, sequence_size=SIZE_DISTRIBUTION, entity_features={ # Multiple entity features "biomarker_a": biomarker_a_gen, "biomarker_b": biomarker_b_gen, }, missing_data={"biomarker_a": 0.05, "biomarker_b": 0.05}, ) biomarker_mocker.add_profile(biomarker_profile) # Configure sampling at specific intervals (baseline, 1 week, 1 month, 3 months) time_strat = TimeStrategy.init("fixed") time_strat.update_settings( t0_date="2023-01-01", sampling_steps=[7, 25, 62], # Days between measurements granularity="day", ) biomarker_mocker.set_time_design( EventTimeDesign(t0_strategy=time_strat, sampling_strategy=time_strat) ) biomarker_pool = biomarker_mocker() ## -- Overview print(biomarker_pool.statistics) Let's access a single single sequence from the biomarker pool. ## access to single sequence biomarker_sequence = biomarker_pool["seq-0-profile-0"] print(biomarker_sequence.statistics) Let's access the first entity value of the sequence, which corresponds to the first biomarker measurement. # -- access to the first entity value (0 based) biomarker_sequence[0].value ---------------------------------------- ## Trajectory Simulation # Trajectory Simulation This notebook demonstrates how to simulate synthetic trajectories using *TanaT*. Trajectories combine multiple sequence types (events, states, intervals) to represent complex patient journeys. We'll explore how to create realistic multi-dimensional temporal data into comprehensive trajectories. These simulation tools are essential for: - Testing sequence analysis algorithms - Generating synthetic data for research - Understanding the impact of sequence characteristics on analysis outcomes - Creating controlled experiments with known ground truth ### Required imports from datetime import datetime import numpy as np import pandas as pd # Simulation imports from tanat.dataset.simulation.sequence import ( generate_event_sequences, generate_state_sequences, generate_interval_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, StateSequencePool, IntervalSequencePool, ) # Trajectory pool from tanat.trajectory import TrajectoryPool ## 1. Creating Multi-Sequence Trajectories Trajectories represent complete patient journeys by combining different types of sequences. Let's simulate a healthcare scenario with medical visits, health states, and medication treatments. # Global settings N_SEQ = 150 SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 ### Generate Individual Sequence Types First, we'll create the component sequences that will form our trajectories. # Generate event sequences (medical visits) event_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=[ "GENERAL_PRACTITIONER", "SPECIALIST", "EMERGENCY", "LABORATORY", "RADIOLOGIST", ], missing_data=0.1, entity_feature="visit_type", seed=SEED, ) # Generate state sequences (health conditions) state_data = generate_state_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=[ "HEALTHY", "ACUTE_ILLNESS", "TREATMENT", "RECOVERY", "CHRONIC_MONITORING", ], missing_data=0.05, entity_feature="health_state", seed=SEED, ) # Generate interval sequences (medication treatments) interval_data = generate_interval_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=[ "ANTIBIOTIC", "PAIN_RELIEVER", "ANTI_INFLAMMATORY", "ANTIHYPERTENSIVE", ], missing_data=0.15, entity_feature="medication", seed=SEED, ) print(f"Generated data for {N_SEQ} patients:") print(f"- Events: {len(event_data)} records") print(f"- States: {len(state_data)} records") print(f"- Intervals: {len(interval_data)} records") ### Create Sequence Pools Transform the raw data into *TanaT* sequence pools with appropriate settings. # Create sequence pools event_pool = EventSequencePool( event_data, { "id_column": "id", "time_column": "date", "entity_features": ["visit_type"], }, ) state_pool = StateSequencePool( state_data, { "id_column": "id", "start_column": "start_date", "default_end_value": datetime.now(), "entity_features": ["health_state"], }, ) interval_pool = IntervalSequencePool( interval_data, { "id_column": "id", "start_column": "start_date", "end_column": "end_date", "entity_features": ["medication"], }, ) ### Generate Static Patient Data Create demographic and clinical characteristics that will be shared across all sequences for each patient. def generate_patient_demographics(n_patients, seed=42): """Generate realistic patient demographic data.""" np.random.seed(seed) demographics = [] for i in range(n_patients): patient_id = f"seq-{i}" demographics.append( { "id": patient_id, "age_group": np.random.choice( ["18-30", "31-50", "51-70", "70+"], p=[0.2, 0.3, 0.3, 0.2] ), "gender": np.random.choice(["M", "F"], p=[0.48, 0.52]), "insurance_type": np.random.choice( ["PUBLIC", "PRIVATE", "MIXED"], p=[0.6, 0.3, 0.1] ), "chronic_condition": np.random.choice([True, False], p=[0.35, 0.65]), "risk_score": np.random.uniform(0, 10), } ) return pd.DataFrame(demographics) static_data = generate_patient_demographics(N_SEQ, seed=SEED) print("Patient demographics:") static_data.head() ## 2. Building the Trajectory Pool Combine the sequence pools and static data into a comprehensive trajectory pool. # Create trajectory pool trajectory_pool = TrajectoryPool.init_empty() # Add sequence pools with descriptive names trajectory_pool.add_sequence_pool(event_pool, "medical_visits") trajectory_pool.add_sequence_pool(state_pool, "health_states") trajectory_pool.add_sequence_pool(interval_pool, "medications") # Add static features trajectory_pool.add_static_features( static_data, id_column="id", static_features=[ "age_group", "gender", "insurance_type", "chronic_condition", "risk_score", ], ) # Configure trajectory pool settings trajectory_pool.update_settings( intersection=False, # Use union of IDs across SequencePools ) trajectory_pool ### Examine Individual Trajectories Let's look at a complete patient trajectory. # Examine a specific patient trajectory patient_id = "seq-5" patient_trajectory = trajectory_pool[patient_id] patient_trajectory ---------------------------------------- ## Data Wrangling for Sequences # Data Wrangling for Sequences This notebook demonstrates essential data wrangling techniques for sequence data in *TanaT*. We'll explore filtering, querying, pattern matching, and temporal alignment operations that are crucial for preparing sequence data for analysis. These techniques are essential for: - Preparing data for machine learning models - Extracting patient cohorts for clinical studies - Cleaning and validating temporal datasets - Creating analysis-ready sequence collections ### Required imports from datetime import datetime, timedelta import pandas as pd import numpy as np # Simulation imports from tanat.dataset.simulation.sequence import ( generate_event_sequences, generate_interval_sequences, generate_state_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, StateSequencePool, IntervalSequencePool, ) # Criterion for filtering from tanat.criterion import ( QueryCriterion, PatternCriterion, TimeCriterion, LengthCriterion, StaticCriterion, ) # Visualization from tanat.visualization.sequence import SequenceVisualizer ## 1. Data Setup We'll create a comprehensive healthcare dataset with three types of sequences: event sequences (discrete healthcare visits), state sequences (health conditions over time), and interval sequences (medication periods). This multi-modal approach reflects real-world healthcare data complexity. ### Dataset Configuration Define the parameters for our simulated healthcare dataset. # Dataset parameters N_SEQ = 150 # Number of patient sequences SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] # Variable sequence lengths SEED = 42 # For reproducible results ### Event Sequences: Healthcare Visits Generate sequences representing different types of healthcare visits with temporal ordering. # Healthcare visit types reflecting real clinical workflows event_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=[ "GENERAL_PRACTITIONER", "SPECIALIST", "NURSE", "RADIOLOGIST", "LABORATORY", "EMERGENCY", "PHARMACY", ], missing_data=0.15, # 15% missing data to simulate real-world conditions entity_feature="visit_type", seed=SEED, ) print(f"Generated {len(event_data)} event records across {N_SEQ} patients") ### State Sequences: Health Conditions Generate sequences representing patient health states that persist over time periods. # Health states following typical disease progression patterns state_data = generate_state_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=[ "HEALTHY", "SICK", "TREATMENT", "CONVALESCENCE", "CHRONIC_MONITORING", "REMISSION", ], missing_data=0.1, # Lower missing rate for health states entity_feature="health_state", seed=SEED, ) # Report generated state sequences state_data.describe(include="all") ### Interval Sequences: Medication Periods Generate sequences representing medication prescriptions with start and end dates. # Common medication categories with defined treatment periods interval_data = generate_interval_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=[ "ANTIBIOTIC", "PAIN_RELIEVER", "CORTICOSTEROID", "ANTICOAGULANT", "ANTIHYPERTENSIVE", "INSULIN", ], missing_data=0.2, # Higher missing rate for medication data entity_feature="medication", seed=SEED, ) # Report generated interval sequences interval_data.describe(include="all") ### Summary of Generated Data Overview of the three sequence types we'll use for data wrangling demonstrations. print("Dataset summary:") print(f"- Event sequences: {len(event_data)} records") print(f"- State sequences: {len(state_data)} records") print(f"- Interval sequences: {len(interval_data)} records") print(f"- Total patients: {N_SEQ}") ### Patient Demographics and Clinical Data Generate static patient characteristics that will enable demographic and clinical filtering demonstrations. def generate_patient_data(num_patients, seed=42): """ Generate realistic patient demographic and clinical data. """ np.random.seed(seed) patients = [] for i in range(num_patients): patient_id = f"seq-{i}" patients.append( { "id": patient_id, "gender": np.random.choice(["M", "F"], p=[0.48, 0.52]), "age": np.random.randint(18, 85), "insurance": np.random.choice( ["PUBLIC", "PRIVATE", "MIXED"], p=[0.6, 0.25, 0.15] ), "chronic_condition": np.random.choice([True, False], p=[0.4, 0.6]), "risk_level": np.random.choice( ["LOW", "MEDIUM", "HIGH"], p=[0.5, 0.3, 0.2] ), "comorbidity_count": np.random.poisson(1.2), } ) return pd.DataFrame(patients) ### Generate and Examine Patient Data Create the static patient dataset and explore its characteristics. # Generate patient demographics static_data = generate_patient_data(N_SEQ, seed=SEED) print("Patient demographics generated:") static_data.head() # Examine demographic distributions static_data.describe(include="all") ### Sequence Pool Configuration Sequence pools are the core data structures in TanaT that combine temporal sequence data with static patient characteristics. Each pool type handles different temporal patterns: - **Event pools**: Discrete time points (healthcare visits) - **State pools**: Persistent conditions over time periods - **Interval pools**: Activities with defined start and end times ### Event Sequence Pool Setup Configure the event pool to handle healthcare visit sequences with patient demographics. # Define shared static features for all sequence types static_features = [ "gender", "age", "insurance", "chronic_condition", "risk_level", "comorbidity_count", ] # Event sequence configuration event_settings = { "id_column": "id", "time_column": "date", "entity_features": ["visit_type"], "static_features": static_features, } event_pool = EventSequencePool( event_data, event_settings, static_data=static_data, ) print(f"Event pool created: {len(event_pool)} sequences") ### State Sequence Pool Setup Configure the state pool to handle health condition sequences with temporal persistence. # State sequence configuration state_settings = { "id_column": "id", "start_column": "start_date", "default_end_value": datetime.now(), # Avoid warnings for open-ended states "entity_features": ["health_state"], "static_features": static_features, } state_pool = StateSequencePool( state_data, state_settings, static_data=static_data, ) print(f"State pool created: {len(state_pool)} sequences") ### Interval Sequence Pool Setup Configure the interval pool to handle medication periods with defined durations. # Interval sequence configuration interval_settings = { "id_column": "id", "start_column": "start_date", "end_column": "end_date", "entity_features": ["medication"], "static_features": static_features, } interval_pool = IntervalSequencePool( interval_data, interval_settings, static_data=static_data ) print(f"Interval pool created: {len(interval_pool)} sequences") ### Sequence Pool Summary All sequence pools are now ready for data wrangling operations. print("All sequence pools initialized:") print(f"- Event sequences: {len(event_pool)} patients") print(f"- State sequences: {len(state_pool)} patients") print(f"- Interval sequences: {len(interval_pool)} patients") print(f"\nEach pool integrates {len(static_features)} static patient features") ### Initial Data Distribution Visualization Examine the distribution of event types, health states, and medications in our dataset before applying any filtering operations. # Visualize event type distribution color_map_event = { "GENERAL_PRACTITIONER": "#c8a2d8", "SPECIALIST": "#f8b3ba", "EMERGENCY": "#f9d79c", "LABORATORY": "#85c1b3", "RADIOLOGIST": "#a9c9f0", "NURSE": "#d4a574", "PHARMACY": "#92d1a3", } # fmt: off SequenceVisualizer.histogram( show_as="occurrence", bar_order="descending" ) \ .title("Healthcare Visit Distribution - All Patients") \ .colors(color_map_event) \ .legend(show=True, title="Visit Types", loc="upper right") \ .x_axis(label="Visit Types") \ .y_axis(label="Frequency") \ .draw(event_pool) # fmt: on # Visualize health state distribution color_map_state = { "HEALTHY": "#28a745", "SICK": "#dc3545", "TREATMENT": "#007bff", "REMISSION": "#6f42c1", "CHRONIC_MONITORING": "#fd7e14", "CONVALESCENCE": "#20c997", } # fmt: off SequenceVisualizer.histogram( show_as="occurrence", bar_order="descending" ) \ .title("Health State Distribution - All Patients") \ .colors("Set1") \ .legend(show=True, title="Health States", loc="upper right") \ .x_axis(label="Health States") \ .y_axis(label="Frequency") \ .draw(state_pool) # fmt: on ## 2. Basic Filtering with Query Criterion Query criterion provide pandas-style filtering capabilities for sequence data. They enable precise selection of entities or sequences based on attribute values, supporting both simple conditions and complex logical expressions. ### Understanding Filtering Levels TanaT supports three filtering levels: - **Entity-level**: Filters individual records within sequences - **Sequence-level**: Filters entire sequences based on whether they contain matching entities - **Trajectory-level**: Not shown here. See [Trajectory data wrangling](./data_wrangling_trajectory.ipynb). ### Entity-Level Filtering Extract specific types of entities across all sequences. This preserves the sequence structure but only includes matching entities. # Create criterion for emergency visits emergency_criterion = QueryCriterion(query="visit_type == 'EMERGENCY'") # Apply entity-level filtering emergency_entities = event_pool.filter(emergency_criterion, level="entity") # Visualize the impact of filtering on event pool # fmt: off SequenceVisualizer.histogram( show_as="occurrence", bar_order="descending" ) \ .title("Emergency Visit Entities Only") \ .colors(color_map_event) \ .legend(show=True, title="Visit Types", loc="upper right") \ .x_axis(label="Visit Types") \ .y_axis(label="Frequency") \ .draw(emergency_entities) # fmt: on # Examine the filtered emergency visit data emergency_entities ### Sequence-Level Filtering Select entire sequences that contain at least one entity matching the criterion. This maintains complete sequence context. # Filter for sequences containing emergency visits sequences_with_emergency = event_pool.filter( emergency_criterion, level="sequence", ) # Examine the results sequences_with_emergency # Visualize the impact of emergency visit filtering # fmt: off SequenceVisualizer.histogram( show_as="occurrence", bar_order="descending" ) \ .title("Visit Distribution in Emergency-Containing Sequences") \ .colors(color_map_event) \ .legend(show=True, title="Visit Types", loc="upper right") \ .x_axis(label="Visit Types") \ .y_axis(label="Frequency") \ .draw(sequences_with_emergency) # fmt: on ### Multi-Condition Query Filtering Use complex pandas query expressions to filter on multiple conditions simultaneously. # Filter for specialist or radiologist visits using 'in' operator specialist_criterion = QueryCriterion( query="visit_type in ['SPECIALIST', 'RADIOLOGIST']" ) specialist_sequences = event_pool.filter(specialist_criterion, level="sequence") # Examine the results specialist_sequences ## 3. Pattern-Based Filtering Pattern criterion enable sophisticated sequence pattern matching beyond simple value filtering. They support single value matching, sequential patterns, and regular expressions for complex data extraction scenarios. ### Single Value Pattern Matching Identify sequences containing specific entity values. This is useful for finding all sequences with particular events or states. # Find sequences containing treatment state treatment_pattern = PatternCriterion( pattern={"health_state": "TREATMENT"}, contains=True, # Check if pattern exists anywhere in sequence ) treatment_sequences = state_pool.filter(treatment_pattern, level="sequence") # Examine treatment sequences treatment_sequences # Examine health state distribution in treatment sequences # fmt: off SequenceVisualizer.histogram( show_as="frequency", bar_order="descending" ) \ .title("Health State Distribution in Treatment Sequences") \ .colors(color_map_state) \ .legend(show=True, title="Health States", loc="upper right") \ .x_axis(label="Health States") \ .y_axis(label="Frequency") \ .draw(treatment_sequences) # fmt: on ### Sequential Pattern Matching Find sequences containing specific ordered patterns. This identifies disease progression patterns or care pathways. # Find sequences with SICK followed by TREATMENT progression sick_to_treatment = PatternCriterion( pattern={"health_state": ["SICK", "TREATMENT"]}, # Pattern can occur anywhere in sequence contains=True, ) recovery_sequences = state_pool.filter(sick_to_treatment, level="sequence") # Examine recovery sequence characteristics recovery_sequences # Examine a specific recovery sequence example example_id = list(recovery_sequences.unique_ids)[0] example_sequence = recovery_sequences[example_id] example_sequence # Visualize recovery patterns with timeline # fmt: off SequenceVisualizer.timeline( stacking_mode="flat", relative_time=True, granularity="day" ) \ .title("Recovery Progression Patterns (SICK → TREATMENT)") \ .colors(color_map_state) \ .marker(spacing=0.8, alpha=0.8) \ .legend(show=True, title="Health States", loc="upper right") \ .x_axis(label="Days from Start") \ .draw(recovery_sequences) # fmt: on ### Regular Expression Pattern Matching Use regex patterns for flexible string matching in entity attributes. This enables complex pattern matching on text data. # Find sequences with specialist visits (starting with 'S') followed by laboratory regex_pattern = PatternCriterion( pattern={"visit_type": ["regex:^S", "LABORATORY"]}, # Sequential pattern with regex contains=True, ) specialist_lab_sequences = event_pool.filter(regex_pattern, level="sequence") # Examine the results specialist_lab_sequences ## 4. Static Data Operations Static criterion enable filtering based on patient demographics and clinical characteristics that remain constant throughout the observation period. This is essential for cohort selection and demographic analysis. ### Multi-Condition Static Filtering Filter patients based on multiple demographic and clinical criteria simultaneously. # Define criterion for elderly patients with chronic conditions elderly_chronic_criterion = StaticCriterion( query="age > 65 and chronic_condition == True" ) print("Filtering for elderly patients (>65) with chronic conditions...") # Apply filtering across all sequence types elderly_chronic_events = event_pool.filter(elderly_chronic_criterion) elderly_chronic_states = state_pool.filter(elderly_chronic_criterion) elderly_chronic_intervals = interval_pool.filter(elderly_chronic_criterion) print("Static filtering results:") print(f"Event sequences: {len(elderly_chronic_events)} patients") print(f"State sequences: {len(elderly_chronic_states)} patients") print(f"Interval sequences: {len(elderly_chronic_intervals)} patients") # Examine patient characteristics in filtered cohort print("Characteristics of elderly chronic patients:") cohort_data = elderly_chronic_events.static_data cohort_data.describe(include="all") ### Risk Stratification Filtering Identify patient cohorts based on clinical risk levels for targeted analysis. # Filter for high-risk patients high_risk_criterion = StaticCriterion(query="risk_level == 'HIGH'") high_risk_sequences = event_pool.filter(high_risk_criterion) print("Risk-based filtering results:") print(f"High-risk patients: {len(high_risk_sequences)}") print(f"Percentage of total: {len(high_risk_sequences)/len(event_pool)*100:.1f}%") # Compare risk levels with other characteristics risk_analysis = high_risk_sequences.static_data.copy() print("High-risk patient characteristics:") print(f"Mean age: {risk_analysis['age'].mean():.1f} years") print( f"Chronic condition rate: {risk_analysis['chronic_condition'].astype(bool).mean()*100:.1f}%" ) print(f"Mean comorbidities: {risk_analysis['comorbidity_count'].mean():.1f}") ## -- Overview of high-risk sequences -- ## high_risk_sequences ## 5. Time Window Filtering Time criterion enable filtering based on temporal characteristics, allowing precise selection of entities or sequences within specific time windows. This is crucial for longitudinal studies and time-bounded analyses. ### Entity-Level Time Filtering Filter individual entities (events, states, intervals) that fall within specified time boundaries. # Define recent time window (last 3 months) recent_start = datetime.now() - timedelta(days=90) recent_end = datetime.now() recent_time_criterion = TimeCriterion( start_after=recent_start, end_before=recent_end, # Entity must be entirely within time range duration_within=True, ) print(f"Filtering for entities between: [{recent_start.date()}, {recent_end.date()}]") # Apply time filtering to interval sequences (medication periods) recent_intervals = interval_pool.filter( recent_time_criterion, level="entity", ) print("Recent medication intervals:") print(f"Entities in time window: {len(recent_intervals.sequence_data)}") print(f"Original entities: {len(interval_pool.sequence_data)}") print(f"Sequences affected: {len(recent_intervals)}") # Examine recent medication data recent_intervals.sequence_data.head() # Visualize medication duration distribution in recent intervals # fmt: off SequenceVisualizer.histogram( show_as="time_spent", bar_order="descending", orientation="horizontal" ) \ .title("Recent Medication Duration Analysis") \ .colors("Spectral") \ .legend(show=True, title="Medications", loc="lower right") \ .x_axis(label="Total Duration") \ .y_axis(label="Medication Types") \ .draw(recent_intervals) # fmt: on ### Sequence-Level Time Filtering Filter entire sequences that fall within specified time boundaries, maintaining complete sequence context. # Define historical time window (1 year ago to 3 months ago) historical_start = datetime.now() - timedelta(days=365) historical_end = datetime.now() - timedelta(days=90) historical_time_criterion = TimeCriterion( start_after=historical_start, end_before=historical_end, sequence_within=True, # Entire sequence must be within time range ) # Apply sequence-level time filtering historical_events = event_pool.filter(historical_time_criterion, level="sequence") ## Report results historical_events ## 6. Length-Based Filtering Length criterion enable filtering sequences based on the number of entities they contain. This is useful for ensuring sufficient data for analysis or identifying outlier sequences. ### Filtering for Extended Sequences Identify sequences with sufficient data points for robust analysis. # Filter for sequences with more than 8 entities long_sequences_criterion = LengthCriterion(gt=8) long_event_sequences = event_pool.filter(long_sequences_criterion) ## Report results print("\nLong event sequences overview:") long_event_sequences ### Filtering for Concise Sequences Identify sequences with limited data points, which may require different analytical approaches. # Filter for sequences with 5 or fewer entities short_sequences_criterion = LengthCriterion(le=5) short_event_sequences = event_pool.filter(short_sequences_criterion) print("Short sequence filtering:") print(f"Sequences with ≤5 entities: {len(short_event_sequences)}") print(f"Percentage of total: {len(short_event_sequences)/len(event_pool)*100:.1f}%") ## Report results short_event_sequences ## 7. Handling Missing Data Missing data is common in healthcare sequences due to incomplete recording, system limitations, or patient non-adherence. TanaT provides tools to identify, analyze, and handle missing values appropriately. ### Detecting Missing Data Patterns First, identify the presence and extent of missing data in the dataset. # Check vocabulary to see if missing values (None) are present print("Dataset vocabulary analysis:") print(f"Event vocabulary: {event_pool.vocabulary}") print(f"Missing values present: {None in event_pool.vocabulary}") ### Entity-Level Missing Data Analysis Identify individual entities with missing attribute values. # Find entities with missing visit types missing_visits_criterion = QueryCriterion(query="visit_type.isna()") missing_visit_entities = event_pool.filter(missing_visits_criterion, level="entity") # Report missing visit entities missing_visit_entities ### Sequence-Level Missing Data Analysis Identify sequences that contain any missing values, which may need special handling. # Find sequences containing missing values sequences_with_missing = event_pool.filter(missing_visits_criterion, level="sequence") # Report sequences with missing values sequences_with_missing.unique_ids ### Data Cleaning: Removing Missing Values Create clean datasets by filtering out entities with missing values. # Create clean dataset by removing entities with missing values clean_data_criterion = QueryCriterion(query="visit_type.notna()") clean_event_pool = event_pool.filter(clean_data_criterion, level="entity") print("Data cleaning results:") print(f"Original entities: {len(event_pool.sequence_data)}") print(f"Clean entities: {len(clean_event_pool.sequence_data)}") print( f"Entities removed: {len(event_pool.sequence_data) - len(clean_event_pool.sequence_data)}" ) # Verify data cleaning effectiveness print("Data quality verification:") print(f"Vocabulary before cleaning: {event_pool.vocabulary}") print(f"Vocabulary after cleaning: {clean_event_pool.vocabulary}") print(f"Missing values eliminated: {None not in clean_event_pool.vocabulary}") ## 8. Reference Date Management Reference dates (T0) enable temporal alignment of sequences by establishing a common starting point. This is essential for comparative analysis and cohort studies where events need to be aligned relative to a specific milestone. ### Event-Based Reference Dating Set reference dates based on the occurrence of specific events in each sequence. # Set T0 based on first emergency visit occurrence emergency_t0_pool = event_pool.copy() emergency_t0_pool.zero_from_query( query="visit_type == 'EMERGENCY'", # Use first occurrence if multiple emergency visits use_first=True, ) # Examine reference dates pd.DataFrame.from_dict( emergency_t0_pool.t_zero, orient="index", columns=["T0 Date"], ) ### Temporal Transformation: Relative Time Convert absolute timestamps to relative time from the reference date for temporal analysis. # Transform to relative time (days from emergency visit) emergency_t0_pool.to_relative_time( granularity="day", drop_na=True, # Remove entities without valid T0 ) # Visualize temporal alignment impact # fmt: off SequenceVisualizer.timeline( relative_time=True, stacking_mode="flat", granularity="day" ) \ .title("Healthcare Visits Aligned to Emergency Visit (T0)") \ .colors(color_map_event) \ .marker(size=10, alpha=0.9) \ .legend(show=True, title="Visit Types", loc="upper right") \ .x_axis(label="Days from Emergency Visit") \ .set_theme("dark_background") \ .draw(emergency_t0_pool) # fmt: on ### Position-Based Reference Dating Alternative approach: set reference dates based on sequence position rather than event content. # Set T0 based on the third event in each sequence position_t0_pool = event_pool.copy() # 0-indexed: position 2 = 3rd event position_t0_pool.zero_from_position(position=2) # Examine position-based reference dates pd.DataFrame.from_dict( position_t0_pool.t_zero, orient="index", columns=["T0 Date"], ) ### Temporal Transformation: Relative Rank Convert to ordinal positions relative to the reference point. # Transform to relative rank (ordinal positions from T0) position_t0_pool.to_relative_rank() ## 9. Advanced Sequence Filtering Advanced filtering combines multiple criterion to create sophisticated patient cohorts. TanaT supports both set-based operations (intersection, union) and sequential filtering approaches for complex data extraction scenarios. ### Set-Based Cohort Selection Use the `which()` method to identify patient IDs meeting specific criterion, then combine using set operations. # Step 1: Identify high-risk patients high_risk_ids = event_pool.which(StaticCriterion(query="risk_level == 'HIGH'")) print(f"Step 1 - High-risk patients: {len(high_risk_ids)}") # Step 2: Identify elderly patients elderly_ids = event_pool.which(StaticCriterion(query="age > 50")) print(f"Step 2 - Elderly patients (>50): {len(elderly_ids)}") # Step 3: Identify patients with emergency visits emergency_ids = event_pool.which(QueryCriterion(query="visit_type == 'EMERGENCY'")) print(f"Step 3 - Patients with emergency visits: {len(emergency_ids)}") # Step 4: Find intersection of all three criteria intersection_ids = high_risk_ids.intersection(elderly_ids).intersection(emergency_ids) print(f"Step 4 - Final cohort intersection: {len(intersection_ids)}") print( f"Selection rate: {len(intersection_ids)/len(event_pool)*100:.1f}% of total patients" ) # Create filtered sequence pool from intersection intersection_pool = event_pool.subset(intersection_ids) ## Report results intersection_pool ### Sequential Filtering Approach Alternative method: apply filters sequentially using the `filter()` method for the same result. # Sequential filtering approach # Filter 1: High-risk patients high_risk_pool = event_pool.filter(StaticCriterion(query="risk_level == 'HIGH'")) # Report high-risk pool high_risk_pool # Filter 2: Among high-risk, select elderly patients elderly_high_risk_pool = high_risk_pool.filter(StaticCriterion(query="age > 50")) print(f"After elderly filter: {len(elderly_high_risk_pool)} patients") # Filter 3: Among elderly high-risk, find those with emergency visits final_cohort_pool = elderly_high_risk_pool.filter( QueryCriterion(query="visit_type == 'EMERGENCY'"), level="sequence" ) print(f"Final cohort: {len(final_cohort_pool)} patients") # Verify both approaches yield identical results print("Verification of filtering approaches:") print(f"Set-based approach: {len(intersection_pool)} patients") print(f"Sequential approach: {len(final_cohort_pool)} patients") print(f"Results identical: {len(intersection_pool) == len(final_cohort_pool)}") ## Report final cohort final_cohort_pool ---------------------------------------- ## Data Wrangling for Trajectories # Data Wrangling for Trajectories This notebook demonstrates advanced data wrangling techniques for trajectory data in *TanaT*. Trajectories represent the complete patient journey by combining multiple sequence types (events, states, intervals) into unified analytical structures. This multi-dimensional approach enables sophisticated filtering and analysis across different temporal data modalities. These techniques are essential for: - Preparing complex multi-modal datasets for machine learning models - Extracting patient cohorts based on cross-sequence patterns - Analyzing care pathways and treatment trajectories - Creating comprehensive analysis-ready healthcare datasets ### Required imports from datetime import datetime import pandas as pd import numpy as np # Simulation imports from tanat.dataset.simulation.sequence import ( generate_event_sequences, generate_interval_sequences, generate_state_sequences, ) # Sequence pools from tanat.sequence import ( EventSequencePool, StateSequencePool, IntervalSequencePool, ) # Trajectory pool from tanat.trajectory import TrajectoryPool # Criterion for filtering from tanat.criterion import ( QueryCriterion, PatternCriterion, StaticCriterion, ) # Visualization from tanat.visualization.sequence import SequenceVisualizer ## 1. Trajectory Data Setup Trajectories integrate multiple sequence types to represent complete patient journeys. We'll create a comprehensive healthcare dataset with events (visits), states (health conditions), and intervals (medication periods) that reflect real-world clinical complexity. ### Dataset Configuration Define parameters for our multi-modal healthcare trajectory dataset. # Dataset parameters N_SEQ = 500 # Number of patient trajectories SIZE_DISTRIBUTION = [4, 5, 6, 7, 8, 9, 10, 11, 12] SEED = 42 # For reproducible results ### Medical Visit Events Generate healthcare visit sequences representing diverse clinical encounters and care coordination. # Comprehensive healthcare visit types including surgical procedures visit_data = generate_event_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=[ "GENERAL_PRACTITIONER", "SPECIALIST", "EMERGENCY", "LABORATORY", "RADIOLOGIST", "SURGERY", "PHARMACY", ], missing_data=0.12, entity_feature="visit_type", seed=SEED, ) visit_data.describe(include="all") # Display summary statistics ### Health State Sequences Generate health condition sequences with expanded vocabulary including disease progression and recovery patterns. # Extended health states covering full disease trajectory health_data = generate_state_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=[ "HEALTHY", "ACUTE_ILLNESS", "TREATMENT", "RECOVERY", "CHRONIC_MONITORING", "REMISSION", "DETERIORATION", ], missing_data=0.08, # Lower missing rate for health states entity_feature="health_state", seed=SEED, ) health_data.describe(include="all") # Display summary statistics ### Medication Interval Sequences Generate medication prescription periods including high-intensity treatments for complex conditions. # Comprehensive medication categories including intensive treatments medication_data = generate_interval_sequences( n_seq=N_SEQ, seq_size=SIZE_DISTRIBUTION, vocabulary=[ "ANTIBIOTIC", "PAIN_RELIEVER", "CORTICOSTEROID", "ANTICOAGULANT", "ANTIHYPERTENSIVE", "INSULIN", "CHEMOTHERAPY", ], missing_data=0.15, entity_feature="medication", seed=SEED, ) medication_data.describe(include="all") # Display summary statistics ### Data Generation Summary Overview of the multi-modal trajectory components we'll integrate. print("Trajectory dataset components:") print(f"- Medical visits: {len(visit_data)} records") print(f"- Health states: {len(health_data)} records") print(f"- Medication periods: {len(medication_data)} records") print(f"- Total patients: {N_SEQ}") ### Comprehensive Patient Demographics Generate rich patient profiles with demographics, clinical characteristics, and risk factors that reflect real-world healthcare complexity. def generate_comprehensive_patient_data(n_patients, seed=42): """ Generate comprehensive patient demographic and clinical data with age-dependent risk factors. Parameters: - n_patients: Number of patient records to generate - seed: Random seed for reproducibility Returns: - DataFrame with comprehensive patient characteristics """ np.random.seed(seed) patients = [] for i in range(n_patients): patient_id = f"seq-{i}" age = np.random.randint(18, 90) # Age-dependent chronic condition probability (realistic healthcare patterns) chronic_prob = 0.2 if age < 40 else 0.4 if age < 65 else 0.7 patients.append( { "id": patient_id, "age": age, "age_group": "18-40" if age < 40 else "40-65" if age < 65 else "65+", "gender": np.random.choice(["M", "F"], p=[0.48, 0.52]), "insurance_type": np.random.choice( ["PUBLIC", "PRIVATE", "MIXED"], p=[0.6, 0.25, 0.15] ), "chronic_condition": np.random.choice( [True, False], p=[chronic_prob, 1 - chronic_prob] ), "risk_level": np.random.choice( ["LOW", "MEDIUM", "HIGH"], p=[0.5, 0.3, 0.2] ), "comorbidity_count": np.random.poisson(1.5), "bmi_category": np.random.choice( ["NORMAL", "OVERWEIGHT", "OBESE"], p=[0.4, 0.35, 0.25] ), "smoking_status": np.random.choice( ["NEVER", "FORMER", "CURRENT"], p=[0.6, 0.25, 0.15] ), "family_history": np.random.choice([True, False], p=[0.3, 0.7]), } ) return pd.DataFrame(patients) ### Generate and Examine Patient Data Create comprehensive patient demographics with realistic healthcare risk factor distributions. # Generate comprehensive patient demographics static_data = generate_comprehensive_patient_data(N_SEQ, seed=SEED) # Examine demographic and clinical distributions static_data.describe(include="all") ### Trajectory Pool Architecture Trajectory pools integrate multiple sequence types into unified analytical structures. Each sequence type maintains its specific temporal properties while enabling cross-sequence filtering and analysis. ### Define Shared Static Features Establish the comprehensive set of patient characteristics that will be available across all sequence types. # Comprehensive static feature set for trajectory analysis static_features = [ "age", "age_group", "gender", "insurance_type", "chronic_condition", "risk_level", "comorbidity_count", "bmi_category", "smoking_status", "family_history", ] print(f"Static features for trajectory integration: {len(static_features)} variables") print("Features:", static_features) ### Medical Visit Sequence Pool Setup Configure the event pool to handle healthcare visit sequences with comprehensive patient demographics. # Create medical visit sequence pool visit_pool = EventSequencePool( visit_data, { "id_column": "id", "time_column": "date", "entity_features": ["visit_type"], "static_features": static_features, }, static_data=static_data, ) # Report pool creation visit_pool ### Health State Sequence Pool Setup Configure the state pool to handle health condition sequences with temporal persistence. # Create health state sequence pool health_pool = StateSequencePool( health_data, { "id_column": "id", "start_column": "start_date", "default_end_value": datetime.now(), # Avoid warnings for open-ended states "entity_features": ["health_state"], "static_features": static_features, }, static_data=static_data, ) # Report pool creation health_pool ### Medication Sequence Pool Setup Configure the interval pool to handle medication periods with defined durations. # Create medication interval sequence pool medication_pool = IntervalSequencePool( medication_data, { "id_column": "id", "start_column": "start_date", "end_column": "end_date", "entity_features": ["medication"], "static_features": static_features, }, static_data=static_data, ) # Report pool creation medication_pool ### Trajectory Pool Integration Combine all sequence pools into a unified trajectory structure for comprehensive multi-modal analysis. # Initialize empty trajectory pool and add sequence pools trajectory_pool = TrajectoryPool.init_empty() trajectory_pool.add_sequence_pool(visit_pool, "medical_visits") trajectory_pool.add_sequence_pool(health_pool, "health_states") trajectory_pool.add_sequence_pool(medication_pool, "medications") # Report sequence pool integration trajectory_pool # Add comprehensive static features to trajectory pool trajectory_pool.add_static_features( static_data, id_column="id", static_features=static_features ) # Configure trajectory pool settings trajectory_pool.update_settings( # Use union of IDs across sequence pools intersection=False, ) # Display final trajectory pool summary trajectory_pool ### Multi-Modal Data Distribution Examine the distribution across all sequence types in our trajectory dataset before applying filtering operations. # Visualize visit type distribution across all trajectories color_map_visit = { "GENERAL_PRACTITIONER": "#1f77b4", "SPECIALIST": "#ff7f0e", "SURGERY": "#2ca02c", "RADIOLOGIST": "#d62728", "LABORATORY": "#9467bd", "EMERGENCY": "#e377c2", "PHARMACY": "#7f7f7f", } # fmt: off SequenceVisualizer.histogram( show_as="occurrence", bar_order="descending" ) \ .title("Medical Visit Distribution - Complete Trajectory Dataset") \ .colors(color_map_visit) \ .legend(show=True, title="Visit Types", loc="upper right") \ .x_axis(label="Visit Types") \ .y_axis(label="Frequency") \ .draw(visit_pool) # fmt: on # Visualize medication duration patterns color_map_medication = { "ANTIBIOTIC": "#FF6B6B", "PAIN_RELIEVER": "#4ECDC4", "CORTICOSTEROID": "#45B7D1", "ANTICOAGULANT": "#96CEB4", "ANTIHYPERTENSIVE": "#FFEAA7", "INSULIN": "#DDA0DD", "CHEMOTHERAPY": "#FF8C94", } # fmt: off SequenceVisualizer.histogram( show_as="time_spent", bar_order="descending", orientation="horizontal" ) \ .title("Medication Duration Patterns - All Trajectories") \ .colors(color_map_medication) \ .legend(show=True, title="Medications", loc="lower right") \ .x_axis(label="Total Duration") \ .y_axis(label="Medication Types") \ .draw(medication_pool) # fmt: on ## 2. Static Data Operations Static criterion enable filtering entire trajectories based on patient demographics and clinical characteristics. This is essential for creating targeted patient cohorts and population-based analyses. ### Single Criterion Trajectory Filtering Filter trajectories based on individual patient characteristics to create focused analytical cohorts. # Filter for high-risk patients across all trajectory components high_risk_criterion = StaticCriterion(query="risk_level == 'HIGH'") high_risk_trajectories = trajectory_pool.filter(high_risk_criterion, level="trajectory") # Report high-risk cohort selection high_risk_trajectories ### Multi-Criteria Trajectory Filtering Combine multiple patient characteristics for sophisticated cohort selection and population targeting. # Define complex clinical cohort: elderly patients with multiple risk factors ### Multi-Criteria Trajectory Filtering Combine multiple patient characteristics for sophisticated cohort selection and population targeting. # Define complex clinical cohort: elderly patients with multiple risk factors # Note: chronic_condition and family_history are categorical ("True"/"False" strings) complex_criterion = StaticCriterion( query="age >= 50 and chronic_condition == 'True' and comorbidity_count >= 2" ) complex_cohort = trajectory_pool.filter(complex_criterion, level="trajectory") # Report complex cohort selection complex_cohort # Compare cohort selection strategies elderly_only = trajectory_pool.filter( StaticCriterion(query="age >= 65"), level="trajectory" ) chronic_only = trajectory_pool.filter( StaticCriterion(query="chronic_condition == 'True'"), level="trajectory" ) print("Cohort selection comparison:") print(f"- Elderly only (≥65): {len(elderly_only)} trajectories") print(f"- Chronic condition only: {len(chronic_only)} trajectories") print(f"- Complex multi-criteria: {len(complex_cohort)} trajectories") ## 3. Sequence-Specific Filtering Trajectory filtering can target specific sequence types within the multi-modal structure. This enables precise selection based on patterns in medical visits, health states, or medication usage while maintaining the complete trajectory context. ### Medical Visit Pattern Filtering Identify trajectories based on specific healthcare utilization patterns and care pathways. # Examine the medical visit sequence pool structure print("Medical visit sequence pool overview:") print(f"Total patients: {len(trajectory_pool.sequence_pools['medical_visits'])}") print(f"Visit types: {trajectory_pool.sequence_pools['medical_visits'].vocabulary}") # Filter trajectories containing emergency visits emergency_criterion = QueryCriterion(query="visit_type == 'EMERGENCY'") emergency_trajectories = trajectory_pool.filter( emergency_criterion, level="sequence", sequence_name="medical_visits", # Propagate filtering to trajectory level intersection=True, ) # Report emergency visit analysis emergency_trajectories # Filter trajectories containing surgical procedures surgery_criterion = QueryCriterion(query="visit_type == 'SURGERY'") surgery_trajectories = trajectory_pool.filter( surgery_criterion, level="sequence", sequence_name="medical_visits", intersection=True, # Propagate filtering to trajectory level ) # Report surgery visit analysis surgery_trajectories # Analyze care pathway complexity: trajectories with both emergency and surgery emergency_ids = emergency_trajectories.unique_ids surgery_ids = surgery_trajectories.unique_ids complex_care_ids = emergency_ids.intersection(surgery_ids) print("Complex care pathway analysis:") print(f"Emergency only: {len(emergency_ids - surgery_ids)} trajectories") print(f"Surgery only: {len(surgery_ids - emergency_ids)} trajectories") print(f"Both emergency and surgery: {len(complex_care_ids)} trajectories") print(f"Complex care rate: {len(complex_care_ids)/len(trajectory_pool)*100:.1f}%") # Visualize visit patterns in complex care trajectories complex_care_trajectories = trajectory_pool.subset(complex_care_ids) # fmt: off SequenceVisualizer.histogram( show_as="occurrence", bar_order="descending" ) \ .title("Visit Patterns in Complex Care Trajectories (Emergency + Surgery)") \ .colors(color_map_visit) \ .legend(show=True, title="Visit Types", loc="upper right") \ .x_axis(label="Visit Types") \ .y_axis(label="Frequency") \ .draw(complex_care_trajectories.sequence_pools["medical_visits"]) # fmt: on ### Health State Pattern Analysis Identify trajectories based on disease progression patterns and health state transitions. # Filter trajectories containing treatment states treatment_criterion = PatternCriterion( pattern={"health_state": "TREATMENT"}, contains=True ) treatment_trajectories = trajectory_pool.filter( treatment_criterion, level="sequence", sequence_name="health_states", # Propagate filtering to trajectory level intersection=True, ) # Report treatment state analysis treatment_trajectories # Filter trajectories with illness-to-treatment progression pattern progression_criterion = PatternCriterion( pattern={"health_state": ["ACUTE_ILLNESS", "TREATMENT"]}, contains=True, # Sequential pattern must exist somewhere in sequence ) progression_trajectories = trajectory_pool.filter( progression_criterion, level="sequence", sequence_name="health_states", intersection=True, # Propagate filtering to trajectory level ) # Report illness-to-treatment progression analysis progression_trajectories # Visualize health progression patterns with timeline # fmt: off SequenceVisualizer.timeline( stacking_mode="flat", relative_time=True, granularity="day" ) \ .title("Health Progression Patterns (Illness → Treatment)") \ .colors("RdYlBu") \ .marker(spacing=0.8, alpha=0.7) \ .legend(show=True, title="Health States", loc="upper right") \ .x_axis(label="Days from Start") \ .draw(progression_trajectories.sequence_pools["health_states"]) # fmt: on ### Medication Pattern Filtering Identify trajectories based on medication usage patterns and treatment intensity. # Define high-intensity medication categories intensive_medications = ["CHEMOTHERAPY", "CORTICOSTEROID", "INSULIN"] intensive_criterion = QueryCriterion(query=f"medication in {intensive_medications}") intensive_trajectories = trajectory_pool.filter( intensive_criterion, level="sequence", sequence_name="medications", intersection=True, # Propagate filtering to trajectory level ) # Report intensive medication analysis intensive_trajectories # Analyze medication duration patterns in intensive care trajectories # fmt: off SequenceVisualizer.histogram( show_as="time_spent", bar_order="descending", orientation="horizontal" ) \ .title("Medication Duration in Intensive Care Trajectories") \ .colors(color_map_medication) \ .legend(show=True, title="Medications", loc="lower right") \ .x_axis(label="Total Duration") \ .y_axis(label="Medication Types") \ .draw(intensive_trajectories.sequence_pools["medications"]) # fmt: on ## 4. Reference Date Management Trajectory-level reference dating enables temporal alignment across multiple sequence types. This is essential for analyzing care coordination, treatment timing, and cross-sequence temporal relationships. ### Default Temporal Alignment By default, trajectories use the earliest timestamp across all sequence types as the reference point. # Create copy for temporal alignment demonstration aligned_trajectory_pool = trajectory_pool.copy() # Apply default reference dating (first entity across all sequences) aligned_trajectory_pool.zero_from_position(0) # Mimic default behavior # Display T zero reference point pd.DataFrame.from_dict( aligned_trajectory_pool.t_zero, orient="index", columns=["T0 Date"], ) # Transform medical visits to relative time from default T0 # fmt: off aligned_trajectory_pool.sequence_pools["medical_visits"] \ .to_relative_time( granularity="day", drop_na=True, ) # fmt: on ### Event-Based Reference Dating Set common reference dates based on specific clinical events for targeted temporal analysis. # Create new copy for event-based alignment event_aligned_pool = trajectory_pool.copy() # Set T0 based on first emergency visit across trajectories event_aligned_pool.zero_from_query( query="visit_type == 'EMERGENCY'", # T0 = first emergency visit sequence_name="medical_visits", use_first=True, ) # Report T zero reference point pd.DataFrame.from_dict( event_aligned_pool.t_zero, orient="index", columns=["T0 Date"], ) # Transform medication data relative to emergency visit T0 # fmt: off event_aligned_pool.sequence_pools["medications"] \ .to_relative_time( granularity="day", drop_na=True, ) # fmt: on ## 5. Advanced Trajectory Filtering Advanced filtering combines multiple criterion and sequence types to create sophisticated patient cohorts. This multi-dimensional approach enables precise population targeting and complex analytical workflows. ### Multi-Dimensional Cohort Selection Build complex patient cohorts by combining demographics, healthcare utilization, and clinical complexity dimensions. # Dimension 1: Demographics - elderly patients with risk factors demo_criterion = StaticCriterion( query="age >= 60 and (chronic_condition == 'True' or comorbidity_count >= 2)" ) demo_cohort = trajectory_pool.filter(demo_criterion, level="trajectory") # Report demographic cohort selection demo_cohort # Dimension 2: Healthcare utilization - multiple high-acuity visit types utilization_criterion = QueryCriterion( query="visit_type in ['EMERGENCY', 'SPECIALIST', 'SURGERY']" ) utilization_cohort = demo_cohort.filter( utilization_criterion, level="sequence", sequence_name="medical_visits", intersection=True, ) # Report utilization cohort selection utilization_cohort # Dimension 3: Clinical complexity - illness-to-treatment progression complexity_criterion = PatternCriterion( pattern={"health_state": ["ACUTE_ILLNESS", "TREATMENT"]}, contains=True ) final_complex_cohort = utilization_cohort.filter( complexity_criterion, level="sequence", sequence_name="health_states", intersection=True, ) # Report final complex cohort selection final_complex_cohort ### Set-Based Trajectory Operations Use set operations to combine trajectory IDs from different filtering criterion for flexible cohort construction. # Step 1: Identify high-risk patients high_risk_criterion = StaticCriterion(query="risk_level == 'HIGH'") high_risk_ids = trajectory_pool.which(high_risk_criterion) print("Set-based trajectory operations:") print(f"High-risk trajectories: {len(high_risk_ids)}") # Step 2: Identify emergency care utilizers emergency_criterion = QueryCriterion(query="visit_type == 'EMERGENCY'") emergency_ids = trajectory_pool.sequence_pools["medical_visits"].which( emergency_criterion ) print(f"Emergency care trajectories: {len(emergency_ids)}") # Step 3: Identify treatment recipients treatment_criterion = PatternCriterion( pattern={"health_state": "TREATMENT"}, contains=True ) treatment_ids = trajectory_pool.sequence_pools["health_states"].which( treatment_criterion ) print(f"Treatment trajectories: {len(treatment_ids)}") ### Set Operations for Cohort Construction Combine trajectory sets using union, intersection, and difference operations. # Union: High-risk OR emergency care union_ids = high_risk_ids.union(emergency_ids) print("Set operation results:") print(f"Union (high-risk OR emergency): {len(union_ids)} trajectories") # Intersection: High-risk AND emergency care intersection_ids = high_risk_ids.intersection(emergency_ids) print(f"Intersection (high-risk AND emergency): {len(intersection_ids)} trajectories") # Difference: High-risk but NOT emergency care difference_ids = high_risk_ids - emergency_ids print(f"Difference (high-risk NOT emergency): {len(difference_ids)} trajectories") # Triple intersection: High-risk AND emergency AND treatment triple_intersection = high_risk_ids.intersection(emergency_ids).intersection( treatment_ids ) # Create final trajectory subset from triple intersection comprehensive_cohort = trajectory_pool.subset(triple_intersection) # Report comprehensive cohort selection comprehensive_cohort # Visualize comprehensive cohort characteristics across all sequence types # fmt: off SequenceVisualizer.timeline( stacking_mode="by_category", relative_time=True, granularity="day" ) \ .title("Comprehensive Cohort: High-Risk + Emergency + Treatment Trajectories") \ .colors(color_map_visit) \ .marker(spacing=0.6, alpha=0.8) \ .legend(show=True, title="Visit Types", loc="upper right") \ .x_axis(label="Days from Start") \ .draw(comprehensive_cohort.sequence_pools["medical_visits"]) # fmt: on ---------------------------------------- ## Illustration of TanaT on the MIMIC-IV dataset # Illustration of TanaT on the MIMIC-IV dataset ## Overview This tutorial demonstrates the comprehensive analysis of patient care trajectories using TanaT (Temporal Analysis and Temporal Trajectories) with the MIMIC-IV database. MIMIC-IV is a freely accessible critical care database containing de-identified health records from patients admitted to the Beth Israel Deaconess Medical Center. **Learning Objectives:** - Load and preprocess temporal healthcare data using TanaT - Create and visualize patient trajectory sequences - Apply trajectory-based clustering to identify patient care patterns - Perform survival analysis comparing different patient trajectory clusters **Dataset Information:** The MIMIC-IV database contains comprehensive electronic health records including: - Patient demographics and mortality information - Hospital admissions with admission types and locations - Medical procedures coded with ICD standards - Pharmacy prescriptions and medication data **Data Access:** - **Recommended**: TanaT provides direct database access via the `access("mimic4")` function - **Alternative**: Manual setup from [PhysioNet MIMIC-IV Demo](https://physionet.org/content/mimic-iv-demo/2.2/) following [MIT-LCP setup procedures](https://github.com/MIT-LCP/mimic-code) ## Methodology This analysis follows a systematic approach to longitudinal healthcare data analysis: 1. **Data Preparation**: Extract and preprocess multi-modal temporal data 2. **Trajectory Construction**: Build patient sequences combining events and intervals 3. **Exploratory Visualization**: Examine event distributions and individual patient timelines 4. **Trajectory Clustering**: Group patients with similar care patterns using temporal metrics 5. **Survival Analysis**: Compare clinical outcomes across identified patient clusters The analysis focuses on admission sequences as the primary trajectory component, examining how different hospitalization patterns relate to patient outcomes. ## 1. Data Access and Initial Setup ### Import Required Libraries and Access MIMIC-IV Database We begin by importing the necessary libraries and establishing connection to the MIMIC-IV database. TanaT provides convenient direct access to a copy of the MIMIC-IV demo dataset, eliminating the need for manual database setup. import pandas as pd from tanat.dataset import access # Access MIMIC-IV database through TanaT's built-in interface # This automatically downloads and provides access to the MIMIC-IV demo dataset con = access("mimic4") ### Data Extraction and Preprocessing We extract four key data types from MIMIC-IV that will form the basis of our temporal analysis: 1. **Admissions**: Hospital stays with start/end times, admission types, and locations 2. **Patients**: Demographics including age, gender, and mortality dates 3. **Procedures**: Medical procedures with timestamps and ICD codes 4. **Pharmacy**: Medication prescriptions with administration times Each query structures the data with consistent temporal columns (`time`, `endtime`) and relevant clinical features for trajectory analysis. # Extract hospital admissions with temporal intervals admissions = pd.read_sql_query( 'SELECT subject_id, admittime as time, dischtime as endtime, admission_type, admission_location FROM "hosp/admissions"', con, ) # Extract patient demographics and mortality data patients = pd.read_sql_query( 'SELECT subject_id, gender, anchor_age as age, dod FROM "hosp/patients"', con ) # Extract medical procedures with ICD codes procedures = pd.read_sql_query( 'SELECT subject_id, chartdate as time, icd_code, icd_version FROM "hosp/procedures_icd"', con, ) # Extract pharmacy/medication data drugs = pd.read_sql_query( 'SELECT subject_id, starttime as time, pharmacy_id FROM "hosp/pharmacy"', con ) # Close database connection con.close() ### Create Death Events from Patient Data Patient mortality information in MIMIC-IV is stored as static dates in patient records. For trajectory analysis, we transform these dates into temporal events by creating synthetic "admission" records with type 'DEATH'. This approach allows mortality to be integrated naturally into the temporal sequence analysis. # Extract death dates and create temporal death events death_events = patients[patients.dod != ""][["subject_id", "dod"]].rename( columns={"dod": "time"} ) death_events["endtime"] = death_events["time"] # Point event (start = end time) death_events["admission_type"] = "DEATH" death_events["admission_location"] = None # No location for death events # Integrate death events into admissions timeline admissions = pd.concat((admissions, death_events)) # Clean patient data by removing death dates (now in temporal sequences) patients = patients[["subject_id", "gender", "age"]] ### Date Format Conversion Convert string-formatted timestamps to pandas datetime objects for proper temporal ordering and analysis. This standardization ensures accurate chronological sequence processing across all data types. # Convert timestamp strings to datetime objects for temporal analysis admissions.time = pd.to_datetime(admissions.time.str[:10]) admissions.endtime = pd.to_datetime(admissions.endtime.str[:10]) procedures.time = pd.to_datetime(procedures.time.str[:10]) ### Sample Data Inspection Let's examine the structure and content of our preprocessed datasets to verify data quality and understand the characteristics of our patient population. # Display sample admission data including created death events print("Admission data structure (including death events):") admissions.head() # Display patient demographic data print("Patient demographic structure:") patients.head() ## 2. TanaT Trajectory Construction ### Import TanaT Modules for Sequence and Trajectory Analysis Now we transition from raw data to TanaT's trajectory representation. We'll construct two types of sequence pools to capture different aspects of patient care. # Import TanaT sequence components from tanat.sequence import ( EventSequencePool, EventSequenceSettings, IntervalSequencePool, IntervalSequenceSettings, ) # Import TanaT trajectory components from tanat.trajectory import TrajectoryPool, TrajectoryPoolSettings ### Create TanaT Sequence Pools We create two distinct sequence pools representing different temporal aspects of patient care: 1. **Event Sequences** (Procedures): Point-in-time events with procedure codes 2. **Interval Sequences** (Admissions): Duration-based events with admission types and locations Each sequence pool captures a different dimension of the patient care trajectory, enabling comprehensive temporal analysis. # Create procedure event sequence pool # Procedures are point-in-time events characterized by ICD codes settings = EventSequenceSettings( id_column="subject_id", time_column="time", entity_features=["icd_code"] ) procedures_pool = EventSequencePool( sequence_data=procedures, settings=settings, ) procedures_pool # Create admission interval sequence pool # Admissions have duration and are characterized by type and location settings = IntervalSequenceSettings( id_column="subject_id", start_column="time", end_column="endtime", entity_features=["admission_type", "admission_location"], ) admissions_pool = IntervalSequencePool( sequence_data=admissions, settings=settings, ) admissions_pool ### Build Trajectory Pool with Static Features We combine our sequence pools with static patient data to create comprehensive patient trajectories. This integrated representation enables analysis that considers both temporal patterns and patient characteristics. # Configure trajectory pool settings settings = TrajectoryPoolSettings( intersection=False, # Include patients even if missing some sequence types id_column="subject_id", # Patient identifier for static data linkage static_features=["gender", "age"], # Patient demographic characteristics ) # Create comprehensive trajectory pool combining sequences and static features trajpool = TrajectoryPool( sequence_pools={"adm": admissions_pool, "proc": procedures_pool}, static_data=patients, settings=settings, ) # trajectory pool summary trajpool ### Define Index Date (T0) for Trajectories For relative temporal analysis, we establish a reference time point (T0) for each patient. Rather than using absolute timestamps, we define the index date as the time of the first procedure event, enabling comparison of relative temporal patterns across patients. # Define index date using first procedure event for each patient trajpool.zero_from_position(position=0, sequence_name="proc") # Display computed index dates (TO) pd.DataFrame.from_dict( trajpool.t_zero, orient="index", columns=["T0 Date"], ) ### Demonstrate Data Filtering Using Query Criteria TanaT provides flexible filtering capabilities using query-based criteria. We'll demonstrate filtering to identify patients who experienced mortality during the study period. # Import query filtering functionality from tanat.criterion import QueryCriterion # Filter for patients with death events at sequence level death_patients = admissions_pool.filter( QueryCriterion(query="admission_type == 'DEATH'"), level="sequence" ) # Demonstrate same filtering from trajectory pool level death_patients_traj = trajpool.filter( QueryCriterion(query="admission_type == 'DEATH'"), level="sequence", sequence_name="adm", ) death_patients_traj ## 3. Exploratory Data Visualization ### Import Visualization Tools We'll use TanaT's built-in visualization capabilities to explore event distributions and patient timelines before proceeding to advanced analytics. # Import TanaT visualization components from tanat.visualization.sequence import SequenceVisualizer ### Visualize Event Distributions Understanding the frequency and diversity of events in our dataset provides insights into the clinical complexity and helps inform subsequent analysis decisions. # Visualize procedure event frequency distribution # fmt: off SequenceVisualizer.histogram(bar_order="descending") \ .title("Procedure Event Distribution") \ .draw(procedures_pool) # fmt: on The procedure histogram reveals the diversity of medical interventions in the MIMIC-IV dataset. The distribution shows numerous low-frequency procedures, indicating the heterogeneous nature of critical care interventions. # Visualize admission events (combining admission type and location) # fmt: off SequenceVisualizer.histogram(bar_order="descending") \ .title("Admission Event Distribution (Type + Location)") \ .draw(admissions_pool) # fmt: on # Focus on admission type only for clearer clinical interpretation # fmt: off SequenceVisualizer.histogram(bar_order="descending") \ .title("Admission Type Distribution") \ .draw(admissions_pool, entity_features=["admission_type"]) # fmt: on ### Create Timeline Visualizations for Individual Patients Individual patient timelines provide insights into care trajectory patterns and help validate our data preprocessing. We'll examine a specific patient's admission sequence to understand the temporal structure. # Select a representative patient for timeline visualization ids = admissions_pool.unique_ids target_id = "10021487" # Pre-selected patient with interesting trajectory # Extract individual patient sequence patient_sequence = admissions_pool[target_id] # Create timeline visualization for the selected patient # fmt: off SequenceVisualizer.timeline() \ .title(f"Care Timeline for Patient {target_id}") \ .draw(patient_sequence, entity_features=["admission_type"]) # fmt: on ## 4. Trajectory-Based Patient Clustering ### Clustering Methodology Traditional patient clustering relies on static demographic or clinical features. TanaT enables clustering based on temporal care patterns, potentially revealing clinically meaningful patient subgroups defined by their healthcare utilization trajectories. **Clustering Components:** 1. **Entity Metric**: How to compare individual admission events (Hamming distance) 2. **Sequence Metric**: How to compare entire admission sequences (Linear pairwise alignment) 3. **Clustering Algorithm**: Method for grouping similar trajectories (Hierarchical clustering) **Clinical Rationale:** By clustering patients based on admission patterns, we may identify distinct care pathways that correlate with clinical outcomes, resource utilization, or underlying disease processes. ### Define Entity and Sequence Metrics for Clustering We configure metrics that quantify similarity between admission sequences. The hierarchical approach first defines how to compare individual events, then extends this to compare entire sequences. # Import clustering metric components from tanat.metric.entity import HammingEntityMetric, HammingEntityMetricSettings from tanat.metric.sequence import ( LinearPairwiseSequenceMetric, LinearPairwiseSequenceMetricSettings, ) # Configure entity-level metric (Hamming distance for categorical features) hamming_metric = HammingEntityMetric( settings=HammingEntityMetricSettings( default_value=0.0 ) # Padding value for sequences of different lengths ) # Configure sequence-level metric (Linear pairwise alignment) sequence_metric_settings = LinearPairwiseSequenceMetricSettings( entity_metric=hamming_metric, # Use Hamming distance for individual event comparison ) linear_metric = LinearPairwiseSequenceMetric(settings=sequence_metric_settings) # Linear metric settings linear_metric ### Demonstrate Metric Calculation Between Sequences Before applying clustering to the entire dataset, let's examine how our metrics quantify dissimilarity between specific patient pairs. This helps validate our metric choice and understand the clustering behavior. # Select two patients for metric comparison patient1_id = "10021487" patient2_id = "10007795" # Extract their admission sequences seq1 = admissions_pool[patient1_id] seq2 = admissions_pool[patient2_id] # Colormap colormap = { "EW EMER.": "blue", "DIRECT EMER.": "red", "ELECTIVE": "green", "URGENT": "black", "DIRECT OBSERVATION": "purple", } # Visualize both sequences for comparison # fmt: off SequenceVisualizer.timeline() \ .colors(colormap) \ .title(f"Patient {patient1_id} Admission Timeline") \ .draw(seq1, entity_features=["admission_type"]) # fmt: on # fmt: off SequenceVisualizer.timeline() \ .colors(colormap) \ .title(f"Patient {patient2_id} Admission Timeline") \ .draw(seq2, entity_features=["admission_type"]) # fmt: on # Calculate dissimilarity using our configured metric dissimilarity = linear_metric(seq1, seq2) print(f"\nSequence dissimilarity: {dissimilarity:.3f}") ### Perform Hierarchical Clustering on Admission Sequences We apply hierarchical clustering to group patients with similar admission patterns. The clustering uses our configured dissimilarity metric to build a dendrogram and extract discrete patient clusters. # Import hierarchical clustering components from tanat.clustering import ( HierarchicalClusterer, HierarchicalClustererSettings, ) # Configure hierarchical clustering settings clustering_settings = HierarchicalClustererSettings( metric=linear_metric, # Use our configured sequence metric n_clusters=5, # Target number of patient clusters cluster_column="trajectory_cluster", # Column name for cluster assignments ) # Initialize and fit the clustering model clusterer = HierarchicalClusterer(settings=clustering_settings) print("Fitting hierarchical clustering to admission sequences...") clusterer.fit(admissions_pool) # Display clustering summary print("\nClustering Results:") clusterer ### Analyze Clustering Results The clustering algorithm automatically augments our data with cluster assignments. Let's examine the distribution of patients across clusters and understand what the clustering has identified. # Examine cluster assignments in the static data print("Updated admissions pool with clustering results:") print(admissions_pool) # Access results dataframe print("\nCluster stats:") cluster_stats = admissions_pool.static_data["trajectory_cluster"].value_counts() cluster_stats ### Extract Specific Patient Clusters for Analysis We can filter patients by cluster to enable cluster-specific analysis. This is essential for comparing clinical outcomes and understanding the characteristics of each trajectory-based patient group. # Import static filtering functionality from tanat.criterion import StaticCriterion # Extract cluster 1 cluster_id = 0 cluster_patients = admissions_pool.filter( StaticCriterion(query=f"trajectory_cluster == {cluster_id}"), level="sequence", ) cluster_patients ## 5. Survival Analysis Across Trajectory Clusters ### Clinical Outcome Comparison Methodology The ultimate validation of trajectory-based clustering is whether identified patient groups exhibit different clinical outcomes. We'll compare survival curves between clusters to assess whether admission patterns correlate with mortality risk. **Survival Analysis Approach:** 1. **Event Definition**: Use 'DEATH' admission type as mortality endpoint 2. **Time-to-Event**: Measure from trajectory start (T0) to death or censoring 3. **Cluster Comparison**: Compare Kaplan-Meier curves between largest clusters 4. **Clinical Interpretation**: Evaluate whether trajectory patterns predict survival outcomes This analysis demonstrates how temporal pattern recognition can potentially identify patients with different prognoses based on their care utilization patterns. # Import survival analysis components import matplotlib.pyplot as plt from sksurv.nonparametric import kaplan_meier_estimator from tanat.survival import SurvivalAnalysis # Initialize survival analysis with Cox regression model survival_analyzer = SurvivalAnalysis("coxnet") survival_analyzer ### Compare Survival Curves Between Trajectory Clusters We generate Kaplan-Meier survival curves for the two largest patient clusters to assess whether trajectory-based groupings correlate with differential mortality risk. Significant differences would suggest clinical relevance of the identified care patterns. # Compare survival curves between the two largest clusters plt.figure(figsize=(12, 8)) # Get the two largest clusters for comparison top_clusters = cluster_stats.nlargest(2).index.tolist() colors = ["blue", "red"] cluster_labels = [] for i, cluster_id in enumerate(top_clusters): # Filter patients in this cluster cluster_patients = admissions_pool.filter( StaticCriterion(query=f"trajectory_cluster == {cluster_id}"), level="sequence" ) # Construct survival data for this cluster survival_result = survival_analyzer.get_survival_array( sequence_pool=cluster_patients, query="admission_type == 'DEATH'", # Death event definition ) survival_data = survival_result.survival_array # Calculate Kaplan-Meier survival curve time_points, survival_probabilities = kaplan_meier_estimator( survival_data["observed"], # Event occurrence (True = death observed) survival_data["duration"], # Time to event or censoring ) # Plot survival curve n_patients = len(cluster_patients.unique_ids) n_deaths = survival_data["observed"].sum() label = f"Cluster {cluster_id} (n={n_patients}, deaths={n_deaths})" cluster_labels.append(label) plt.step( time_points, survival_probabilities, where="post", color=colors[i], linewidth=2, label=label, ) # Configure plot plt.title( "Kaplan-Meier Survival Curves by Trajectory Cluster", fontsize=14, fontweight="bold" ) plt.xlabel("Time from Index Date (days)", fontsize=12) plt.ylabel("Survival Probability", fontsize=12) plt.grid(True, alpha=0.3) plt.legend(fontsize=11) plt.ylim(0, 1.05) # Add statistical summary plt.figtext( 0.02, 0.02, "Note: Survival curves compare mortality risk between patient groups\n" "identified through trajectory-based clustering of admission patterns.", fontsize=9, style="italic", ) plt.tight_layout() plt.show() ---------------------------------------- ## MOOC Sequence Analysis with TanaT # MOOC Sequence Analysis with TanaT ## Overview This tutorial demonstrates sequence analysis of learner behavior using TanaT with data from a Moodle learning management system. The analysis follows the methodology proposed by Saqr et al. for understanding student engagement patterns in online courses. **Learning Objectives:** - Load and preprocess temporal learning data - Create session-based sequences from event logs - Visualize learner activity patterns - Cluster sequences using Optimal Matching with custom costs **Dataset Information:** The MOOC dataset contains learner interaction logs including: - User identifiers and timestamps - Action types (view, submit, interact, etc.) - Course context and components - Event descriptions **Source:** Saqr, M., López-Pernas, S., Helske, S., Durand, M., Murphy, K., Studer, M., & Ritschard, G. (2024). *Sequence analysis in education: principles, techniques, and tutorial with R*. In *Learning analytics methods and tutorials: A practical guide using R* (pp. 321–354). Springer. ## Methodology 1. **Data Preparation**: Load and clean event logs from the LMS 2. **Session Detection**: Identify learning sessions using inactivity thresholds 3. **Sequence Construction**: Build action sequences per session 4. **Exploratory Visualization**: Examine action distributions and timelines 5. **Sequence Clustering**: Group similar learning behaviors using edit (Optimal Matching) distance import pandas as pd import numpy as np import matplotlib as mpl from tanat.dataset import access ### Load the MOOC Events Dataset TanaT provides direct access to the MOOC dataset through its built-in data access interface. # Load MOOC events dataset df = access("mooc_events") df.head() ### Dataset Description The dataset contains learner interactions with the following attributes: | Column | Description | |--------|-------------| | user | Unique learner identifier | | timecreated | Timestamp of the event | | Event.context | Course name | | Component | Activity type in the course | | Event.name | Description of the action performed | | Log | Textual log description | | Action | Type of action (simplified category) | For this analysis, we focus on the `Action` feature to characterize learner behavior. ### Simplify the Alphabet Following Saqr et al., we consolidate similar quiz-related events into a single category to reduce vocabulary complexity. # Consolidate quiz-related events quiz_events = { "Quiz attempt viewed": "Quiz attempt", "Quiz attempt reviewed": "Quiz attempt", "Quiz attempt started": "Quiz attempt", "Quiz attempt summary viewed": "Quiz attempt", "Quiz attempt submitted": "Quiz attempt", } df["Event.name"] = df["Event.name"].replace(quiz_events) ## 2. Session Detection ### Define Sessions from Inactivity Threshold In this dataset, the statistical unit is not a user but a **learning session**. A session is defined as a period of continuous activity, detected by identifying gaps of inactivity. Following Saqr et al., we use a 2-hour inactivity threshold to split user logs into distinct sessions. This choice yields fewer but longer sequences compared to the original 15-minute threshold. # Define inactivity threshold for session detection INACTIVITY_THRESHOLD = pd.Timedelta("2h") # Detect sessions: new session when user changes OR time gap exceeds threshold df = df.sort_values(["user", "timecreated"]) df.timecreated = pd.to_datetime(df.timecreated) df["session"] = ( (df["user"] != df["user"].shift()) | (df["timecreated"].diff() > INACTIVITY_THRESHOLD) ).cumsum() print(f"Detected {df['session'].nunique()} sessions from {df['user'].nunique()} users") df.head() # Keep session-to-user mapping for later analysis sessions = df[["user", "session"]].drop_duplicates() sessions.head() ### Create Sequential Index For sequence analysis focusing on event order (rather than timestamps), we add a position index within each session. # Add position index within each session def add_position_index(group): group = group.copy() group["index"] = range(len(group)) return group df_indexed = ( df.groupby("session", group_keys=True) .apply(add_position_index, include_groups=False) .reset_index(drop=False) ) df_indexed.head() ## 3. Sequence Construction with TanaT ### Define the Sequence Type and Statistical Unit In sequence analysis, the **actor** (statistical individual) determines what constitutes a sequence. Here, each learning session becomes a sequence, with the student as a static characteristic of that session. We use **state sequences** to match the TraMineR data model from the original publication, where each position represents a discrete action state. from tanat.sequence import StateSequencePool, StateSequenceSettings # Configure sequence settings settings = StateSequenceSettings( id_column="session", # Session as statistical unit start_column="index", # Use position index as time entity_features=["Action", "Event.context", "Event.name"], static_features=["user"], # Link sessions to users ) # Create sequence pool moocpool = StateSequencePool( sequence_data=df_indexed, static_data=sessions, settings=settings, ) moocpool **Pool Summary:** - **5,700 sequences** with lengths ranging from 2 to 153 (mean: 16.8) - **Vocabulary size: 246** unique entity combinations - Entity features automatically inferred as categorical ## 4. Exploratory Data Visualization ### Pool Statistics # Overview statistics moocpool.statistics # Aggregated descriptive statistics moocpool.describe(dropna=True, by_id=False) # Sequence length distribution lengths = moocpool.describe(dropna=True, by_id=True)["length"] lengths.plot.box(figsize=(8, 3), vert=False).set_title("Sequence Length Distribution") ### Filter Sequences by Length The length distribution shows that 90% of sequences have fewer than 40 events. We filter out very short sessions (single event) and very long outliers. from tanat.criterion import LengthCriterion # Keep sequences with 2-40 events moocpool_filtered = moocpool.filter(LengthCriterion(gt=1, le=40), level="sequence") moocpool_filtered ### Visualize Action Distribution For interpretability, we focus on the `Action` feature, reducing the vocabulary from 246 to 12 action types. To ensure consistent colors across all visualizations, we define a color palette mapping each action to a specific color. # Get action vocabulary and create a consistent color palette actions = list(moocpool_filtered.get_vocabulary("Action")) colors = mpl.colormaps["tab20"].colors[: len(actions)] ACTION_COLORS = dict(zip(actions, [mpl.colors.to_hex(c) for c in colors])) print(f"Actions ({len(actions)}): {actions}") from tanat.visualization.sequence import SequenceVisualizer # fmt: off SequenceVisualizer.histogram(show_as="occurrence", bar_order="descending") \ .colors(ACTION_COLORS) \ .title("Action Type Distribution") \ .draw(moocpool_filtered, entity_features=["Action"]) # fmt: on ### Timeline Visualizations Timelines show the sequence of actions within learning sessions. We use `ACTION_COLORS` to maintain visual consistency across all plots. # Timeline for 100 random sessions (flat stacking) import random random.seed(42) sample_ids = random.sample(list(moocpool_filtered.unique_ids), 100) sample_pool = moocpool_filtered.subset(sample_ids) # fmt: off SequenceVisualizer.timeline(stacking_mode="flat") \ .colors(ACTION_COLORS) \ .marker(spacing=1) \ .title("100 Random Sessions") \ .xlabel("Position in Session") \ .draw(sample_pool, entity_features=["Action"]) # fmt: on # Timeline for a single session (session N°5) # fmt: off single_session = moocpool_filtered[5] SequenceVisualizer.timeline() \ .colors(ACTION_COLORS) \ .title("Single Session Timeline") \ .xlabel("Position in Session") \ .draw(single_session, entity_features=["Action"]) # fmt: on ### State Distribution Over Time Distribution plots show how action proportions change across sequence positions. # fmt: off SequenceVisualizer.distribution() \ .colors(ACTION_COLORS) \ .title("Action Distribution Over Session Progress") \ .xlabel("Position in Session") \ .draw(moocpool_filtered, entity_features=["Action"]) # fmt: on ## 5. Alternative: Timestamp-Based Sequences In the previous analysis, we used sequential order only. Here we demonstrate how to use actual timestamps, creating event sequences with real temporal information. from tanat.sequence import EventSequencePool, EventSequenceSettings # Configure with actual timestamps settings = EventSequenceSettings( id_column="session", time_column="timecreated", entity_features=["Action", "Event.context", "Event.name"], static_features=["user"], ) # Create event sequence pool moocpool_timed = EventSequencePool( sequence_data=df, static_data=sessions, settings=settings, ) moocpool_timed.granularity = "minute" moocpool_timed ### Timeline with Real Timestamps The same session now shows actual temporal spacing between events. # Timeline for 100 random sessions (by category stacking) import random random.seed(42) sample_ids = random.sample(list(moocpool_timed.unique_ids), 100) sample_pool = moocpool_timed.subset(sample_ids) # fmt: off SequenceVisualizer.timeline(stacking_mode="by_category") \ .colors(ACTION_COLORS) \ .marker(spacing=1) \ .title("100 Random Sessions") \ .xlabel("Absolute time") \ .draw(sample_pool, entity_features=["Action"]) # fmt: on ## 6. Sequence Clustering with Optimal Matching ### Define Custom Substitution Costs Following Saqr et al., we use Optimal Matching (OM) with manually defined costs between action types. These costs reflect the semantic similarity between different learner actions. # Action vocabulary for reference moocpool_timed.get_vocabulary("Action") # Substitution costs from Saqr et al. (2024) # Higher values = more dissimilar action types # fmt: off cost = {...} # 74 lines, truncated for brevity # fmt: on ### Configure TanaT Metrics We combine: 1. **Hamming entity metric** with custom costs for comparing individual actions 2. **Edit distance** (Optimal Matching) for comparing entire sequences from tanat.metric.entity import HammingEntityMetric, HammingEntityMetricSettings from tanat.metric.sequence import EditSequenceMetric, EditSequenceMetricSettings # Entity metric with custom substitution costs entity_settings = HammingEntityMetricSettings( cost=cost, entity_features=["Action"], default_value=0.0, ) entity_metric = HammingEntityMetric(settings=entity_settings) # Sequence metric: Edit distance (Optimal Matching) sequence_settings = EditSequenceMetricSettings(entity_metric=entity_metric) sequence_metric = EditSequenceMetric(settings=sequence_settings) # Test metric on two sequences sequence_metric(moocpool_timed[1], moocpool_timed[2]) ### Apply Hierarchical Clustering We use hierarchical clustering to group sessions with similar action patterns. The number of clusters (15) follows the original study setup. from tanat.clustering import HierarchicalClusterer, HierarchicalClustererSettings # Configure clustering cluster_settings = HierarchicalClustererSettings( metric=sequence_metric, n_clusters=15, cluster_column="cluster", ) # Fit clustering model clusterer = HierarchicalClusterer(settings=cluster_settings) clusterer.fit(moocpool_timed) ## 7. Cluster Visualization We now visualize the clusters identified in the previous step. We focus on the 6 largest clusters and reuse `ACTION_COLORS` for consistency. # Get the 6 largest clusters largest_clusters = moocpool_timed.static_data.cluster.value_counts().head(6) top_6_clusters = largest_clusters.index.tolist() from tanat.criterion import StaticCriterion # Filter to top 6 clusters moocpool_top6 = moocpool_timed.filter( StaticCriterion(query=f"cluster in {top_6_clusters}"), level="sequence", ) ### Timeline by Cluster We sample 30 sessions per cluster and align events to the session start using `zero_from_position(0)`. N_IDS_PER_CLUSTER = 30 # Sample 30 sequences per cluster static_df = moocpool_top6.static_data sampled_ids = [] for cluster_id, group in static_df.groupby("cluster", observed=True): n = min(N_IDS_PER_CLUSTER, len(group)) sampled_ids.extend(group.sample(n=n, random_state=42).index.tolist()) subset_timeline = moocpool_top6.subset(sampled_ids) subset_timeline.zero_from_position(0) # Default behavior, explicitly written print( f"Sampled {len(sampled_ids)} sessions across {len(top_6_clusters)} clusters (6x{N_IDS_PER_CLUSTER})" ) # fmt: off SequenceVisualizer.timeline(stacking_mode="flat", relative_time=True) \ .colors(ACTION_COLORS) \ .marker(spacing=0.5) \ .x_axis(autofmt_xdate=False) \ .facet(by="cluster", cols=3, share_x=False, title_template="Cluster {value}") \ .title("Session Timelines by Cluster") \ .xlabel("Relative time (minutes)") \ .draw(subset_timeline, entity_features=["Action"]) # fmt: on ### Distribution by Cluster Converting to state sequences allows us to visualize aggregated action proportions over time within each cluster. # Convert to state sequences for distribution visualization moocpool_state = moocpool_top6.as_state() moocpool_state.zero_from_position(0) # Default behavior, explicitly written # fmt: off SequenceVisualizer.distribution(relative_time=True) \ .colors(ACTION_COLORS) \ .facet(by="cluster", cols=3, share_x=False, title_template="Cluster {value}") \ .legend(title="Action") \ .xlabel("Relative time (minutes)") \ .title("Session Distribution by Cluster") \ .draw(moocpool_state, entity_features=["Action"]) # fmt: on ## Conclusion This tutorial demonstrated the correspondence between TanaT and TraMineR by reproducing a learning analytics study originally conducted with TraMineR. TanaT's flexibility allowed us to work with both sequential order and real timestamps, providing richer temporal information than traditional approaches. By implementing edit distance with domain-specific substitution costs, we captured the semantic similarity between different learner actions. The faceted visualization capabilities enabled easy comparison of behavioral patterns across the 15 identified clusters. While our analysis uses actual timestamps rather than just event order, which may explain some differences from the original study, the methodology successfully demonstrates how TanaT can replicate and extend sophisticated sequence analysis workflows from educational research. ---------------------------------------- ## Metadata Management .. _metadata_tutorial: # Metadata Management This tutorial demonstrates how to work with metadata in *TanaT*. Metadata describes the structure and types of your temporal data, enabling proper type coercion, validation, and analysis. **Learning Objectives:** - Understand automatic metadata inference (default behavior) - Inspect and validate inferred metadata - Correct inference errors using update methods - Provide explicit metadata for advanced control - Ensure metadata coherence across sequences and trajectories ## Required Imports We'll use TanaT's sequence and trajectory components along with some utilities for visualization. from datetime import datetime import pandas as pd # TanaT sequence pools from tanat.sequence import EventSequencePool # TanaT trajectory pool from tanat.trajectory import TrajectoryPool ## 1. Automatic Metadata Inference TanaT automatically infers metadata from your data when you create sequence pools. This is the default behavior and requires no configuration. Let's create a simple dataset of patient visits and see what metadata TanaT infers: # Create a simple healthcare dataset data = pd.DataFrame( { "patient_id": [101, 101, 101, 102, 102], "timestamp": [ datetime(2023, 1, 10, 9, 0), datetime(2023, 1, 17, 14, 30), datetime(2023, 2, 5, 10, 15), datetime(2023, 1, 12, 11, 0), datetime(2023, 1, 20, 16, 45), ], "event_type": [ "consultation", "lab_test", "consultation", "consultation", "lab_test", ], "department": [ "cardiology", "laboratory", "cardiology", "neurology", "laboratory", ], } ) # Create an EventSequencePool without specifying metadata # TanaT will infer everything automatically visits_pool = EventSequencePool( sequence_data=data, settings={ "id_column": "patient_id", "time_column": "timestamp", "entity_features": ["event_type", "department"], }, ) print("EventSequencePool created successfully!") print(f"Number of patients: {len(visits_pool)}") print(f"Total visits: {len(data)}") ### What metadata was inferred? The `.metadata` attribute contains all the inferred information about temporal and entity features: # Access the metadata object visits_pool.metadata ## 2. Inspecting Metadata TanaT provides convenient methods to inspect metadata in human-readable formats: - `.metadata.view()`: Display metadata as YAML with documentation. - `.metadata.describe()`: Display metadata with descriptions (human-friendly) # Describe metadata with human-readable explanations print("Human-Readable Description:") print(visits_pool.metadata.describe(verbose=False)) print("\n" + "=" * 60 + "\n") # View metadata as YAML print("YAML Representation:") visits_pool.metadata.view() ## 3. Correcting Inference Errors Sometimes automatic inference might not match your requirements. TanaT provides update methods to correct metadata after creation. ### Example: Updating Temporal Metadata Let's say we want to change the timezone setting for our timestamps: # Update temporal metadata with a specific timezone visits_pool.update_temporal_metadata(timezone="Europe/Paris") print(visits_pool.metadata.describe(verbose=True)) # You can also update the date format visits_pool.update_temporal_metadata(format="%Y-%m-%d %H:%M") print(visits_pool.metadata.describe(verbose=True)) ### Example: Updating Entity Metadata We can also correct metadata for entity features (features that vary within sequences): # Let's say we want to specify that department is an ordinal feature # with a specific order visits_pool.update_entity_metadata( feature_name="department", feature_type="categorical", categories=["laboratory", "cardiology", "neurology", "emergency"], ordered=True, ) print(visits_pool.metadata.describe(verbose=True)) ### Method Chaining Update methods return `self`, allowing you to chain multiple updates: # Chain multiple updates together # fmt: off visits_pool.update_temporal_metadata(timezone="UTC") \ .update_entity_metadata( feature_name="event_type", feature_type="categorical", categories=["consultation", "lab_test", "surgery", "emergency"], ) # fmt: on print(visits_pool.metadata.describe(verbose=True)) ### Changing Temporal Type You can even change the temporal type entirely. For example, converting from datetime to timestep: # Create a simple timestep-based dataset timestep_data = pd.DataFrame( { "patient_id": [201, 201, 201, 202, 202], "timestep": [1, 5, 10, 2, 8], "measurement": ["BP", "HR", "BP", "BP", "HR"], "value": [120, 75, 118, 130, 82], } ) # Create pool and then change from default datetime to timestep measurements_pool = EventSequencePool( sequence_data=timestep_data, settings={ "id_column": "patient_id", "time_column": "timestep", "entity_features": ["measurement", "value"], }, ) # Update to timestep with appropriate settings measurements_pool.update_temporal_metadata( temporal_type="timestep", min_value=1, max_value=100 ) print(measurements_pool.metadata.describe(verbose=True)) ## 4. Specifying Metadata Explicitly (Advanced) Instead of relying on inference, you can provide metadata explicitly at initialization. This is useful when: - You know the exact metadata structure you need - You want to avoid inference overhead - You need to ensure specific settings from the start # Define explicit metadata explicit_metadata = { "temporal_descriptor": { "temporal_type": "datetime", "granularity": "second", "settings": { "timezone": "America/New_York", "date_format": "%Y-%m-%d %H:%M:%S", }, }, "entity_descriptors": { "event_type": { "feature_type": "categorical", "settings": { "categories": ["consultation", "lab_test", "surgery"], }, }, "department": { "feature_type": "categorical", "settings": { "categories": ["cardiology", "neurology", "emergency"], }, }, }, } # Create pool with explicit metadata explicit_pool = EventSequencePool( sequence_data=data, settings={ "id_column": "patient_id", "time_column": "timestamp", "entity_features": ["event_type", "department"], }, metadata=explicit_metadata, ) # Pool overview explicit_pool ## 5. Sequence vs Trajectory Level Metadata **Critical distinction:** TanaT has two levels of data organization: - **Sequence level**: Individual sequences (e.g., one patient's journey) - **Trajectory level**: Collections of sequences (e.g., multiple patients) Metadata updates behave differently at each level! ### Sequence-level updates When you update metadata on a **sequence pool**, you're updating metadata for all sequences in that pool: # Sequence-level update affects all sequences in the pool visits_pool.update_temporal_metadata(timezone="Europe/London") # Access sequence within the pool print(visits_pool[101].metadata.describe(verbose=True)) ### Trajectory-level updates and propagation When you update metadata on a **trajectory pool**, the changes **automatically propagate** to all contained sequence pools: This ensures **metadata coherence** across the entire trajectory. # Create a second sequence pool (medications) meds_data = pd.DataFrame( { "patient_id": [101, 101, 102, 102], "timestamp": [ datetime(2023, 1, 10, 10, 0), datetime(2023, 1, 17, 15, 0), datetime(2023, 1, 12, 12, 0), datetime(2023, 1, 20, 17, 0), ], "medication": ["aspirin", "metformin", "aspirin", "lisinopril"], "dosage": [100, 500, 100, 10], } ) medications_pool = EventSequencePool( sequence_data=meds_data, settings={ "id_column": "patient_id", "time_column": "timestamp", "entity_features": ["medication", "dosage"], }, ) # Create a trajectory pool combining visits and medications trajectory = TrajectoryPool( sequence_pools={"visits": visits_pool, "medications": medications_pool} ) # Pool overview trajectory # Update temporal metadata at TRAJECTORY level # This will propagate to ALL sequence pools trajectory.update_temporal_metadata(timezone="Asia/Tokyo") # Check updated timezones in both pools print("Visits Sequence Metadata:", "\n--------------------") print(trajectory.sequence_pools["visits"].metadata.describe(verbose=True)) print("\n\nMedications Sequence Metadata:", "\n--------------------") print(trajectory.sequence_pools["medications"].metadata.describe(verbose=True)) ### Static metadata (trajectory-specific features) Static features exist at the **trajectory level** (they don't vary within sequences). Examples: patient age, gender, diagnosis at baseline. # Add static data to trajectory static_data = pd.DataFrame( {"patient_id": [101, 102], "age": [45, 62], "gender": ["M", "F"]} ) trajectory_with_static = TrajectoryPool( sequence_pools={"visits": visits_pool, "medications": medications_pool}, static_data=static_data, settings={ "id_column": "patient_id", "static_features": ["age", "gender"], }, ) # Update static metadata trajectory_with_static.update_static_metadata( feature_name="gender", feature_type="categorical", categories=["M", "F", "Other"], ) print(trajectory_with_static.metadata.describe(verbose=True)) ## 6. Complete Healthcare Example Let's put it all together with a realistic healthcare scenario: tracking patient journeys through a hospital system. # Step 1: Create comprehensive patient event data patient_events = pd.DataFrame( { "patient_id": [1001, 1001, 1001, 1001, 1002, 1002, 1002, 1003, 1003], "timestamp": [ datetime(2023, 6, 1, 9, 0), datetime(2023, 6, 3, 14, 30), datetime(2023, 6, 10, 11, 15), datetime(2023, 6, 15, 16, 45), datetime(2023, 6, 2, 10, 30), datetime(2023, 6, 8, 13, 0), datetime(2023, 6, 20, 9, 30), datetime(2023, 6, 5, 8, 45), datetime(2023, 6, 12, 15, 0), ], "event": [ "admission", "surgery", "consultation", "discharge", "admission", "lab_test", "discharge", "admission", "emergency", ], "department": [ "emergency", "surgery", "cardiology", "discharge_unit", "cardiology", "laboratory", "discharge_unit", "neurology", "emergency", ], "severity": [3, 4, 2, 1, 2, 1, 1, 5, 5], } ) # Step 2: Create medication events medication_events = pd.DataFrame( { "patient_id": [1001, 1001, 1001, 1002, 1002, 1003], "timestamp": [ datetime(2023, 6, 1, 10, 0), datetime(2023, 6, 3, 18, 0), datetime(2023, 6, 10, 12, 0), datetime(2023, 6, 2, 11, 0), datetime(2023, 6, 8, 14, 30), datetime(2023, 6, 5, 9, 0), ], "medication": [ "morphine", "antibiotic", "aspirin", "metformin", "aspirin", "morphine", ], "dosage_mg": [10, 500, 100, 1000, 100, 15], "route": ["IV", "oral", "oral", "oral", "oral", "IV"], } ) # Step 3: Create static patient data patient_demographics = pd.DataFrame( { "patient_id": [1001, 1002, 1003], "age": [54, 68, 72], "gender": ["M", "F", "M"], "diagnosis": ["cardiac_event", "diabetes", "stroke"], } ) print("Created comprehensive healthcare dataset!") print(f"Clinical events: {len(patient_events)}") print(f"Medication events: {len(medication_events)}") print(f"Patients: {len(patient_demographics)}") # Step 4: Create sequence pools with automatic inference clinical_pool = EventSequencePool( sequence_data=patient_events, settings={ "id_column": "patient_id", "time_column": "timestamp", "entity_features": ["event", "department", "severity"], }, ) medication_pool = EventSequencePool( sequence_data=medication_events, settings={ "id_column": "patient_id", "time_column": "timestamp", "entity_features": ["medication", "dosage_mg", "route"], }, ) print("Sequence pools created with automatic metadata inference!") print("\nClinical pool metadata:") print(clinical_pool.metadata.describe()) print("\n" + "=" * 60) print("\nMedication pool metadata:") print(medication_pool.metadata.describe()) # Step 5: Correct metadata after inspection # fmt: off # -- clinical pool updates clinical_pool.update_entity_metadata( feature_name="severity", # correcting severity to ordinal categorical feature_type="categorical", categories=[1, 2, 3, 4, 5], ordered=True, ) \ .update_temporal_metadata( timezone="America/New_York" ) ## -- medication pool updates medication_pool.update_entity_metadata( feature_name="dosage_mg", # correcting dosage to ordinal categorical feature_type="categorical", categories=[10, 15, 100, 500, 1000], ordered=True, ) \ .update_temporal_metadata( timezone="America/New_York" ) # fmt: on # Step 6: Create trajectory with static features patient_trajectory = TrajectoryPool( sequence_pools={"clinical_events": clinical_pool, "medications": medication_pool}, static_data=patient_demographics, settings={ "id_column": "patient_id", "static_features": ["age", "gender", "diagnosis"], }, ) # fmt: off # Update static metadata patient_trajectory.update_static_metadata( feature_name="gender", feature_type="categorical", categories=["M", "F", "Other"], ) \ .update_static_metadata( feature_name="diagnosis", feature_type="categorical", categories=[ "cardiac_event", "diabetes", "stroke", "respiratory", "other", ], ) # fmt: on print(patient_trajectory.metadata.describe(verbose=True)) # Step 7: Demonstrate trajectory-level propagation # Update timezone at trajectory level - it propagates to all sequence pools patient_trajectory.update_temporal_metadata( timezone="UTC", ) print("Trajectory level:", "\n----------------") print(patient_trajectory.metadata.describe(verbose=True)) print("\n\nClinical pool level:", "\n----------------") print( patient_trajectory.sequence_pools["clinical_events"].metadata.describe(verbose=True) ) print("\n\nMedication pool level:", "\n----------------") print(patient_trajectory.sequence_pools["medications"].metadata.describe(verbose=True)) ---------------------------------------- ## Sequence Conversions .. _type_conversions_tutorial: # Sequence Type Conversions This tutorial demonstrates conversions between the three temporal sequence types: - **Event**: point-in-time occurrences - **State**: continuous periods with status values - **Interval**: time periods with durations We'll use a simple hospital patient journey to illustrate each conversion. ## Setup from datetime import datetime, timedelta import pandas as pd from tanat.sequence import EventSequencePool ## Sample Data: Hospital Patient Journeys Three patients with different admission/transfer/discharge events. # Event data: admission, transfers, and discharge events event_data = pd.DataFrame( { "patient_id": [101, 101, 101, 101, 102, 102, 102, 103, 103, 103], "timestamp": [ datetime(2023, 6, 1, 9, 0), # Patient 101 datetime(2023, 6, 1, 14, 0), datetime(2023, 6, 2, 10, 0), datetime(2023, 6, 2, 16, 0), datetime(2023, 6, 1, 10, 30), # Patient 102 datetime(2023, 6, 1, 18, 0), datetime(2023, 6, 2, 12, 0), datetime(2023, 6, 1, 11, 0), # Patient 103 datetime(2023, 6, 1, 15, 30), datetime(2023, 6, 2, 14, 0), ], "event_type": [ "admission", "transfer", "transfer", "discharge", "admission", "transfer", "discharge", "admission", "transfer", "discharge", ], "location": [ "Emergency", "ICU", "Ward", None, "Emergency", "Ward", None, "Emergency", "ICU", None, ], } ) events_pool = EventSequencePool( sequence_data=event_data, settings={ "id_column": "patient_id", "time_column": "timestamp", "entity_features": ["event_type", "location"], }, ) print("EventSequencePool created") events_pool ## 1. Event to State Conversion Convert events to continuous states by specifying: - `state_value_col`: the column containing state values - `end_value`: a datetime to use as the end time for the last state in each sequence The `end_value` parameter sets when all final states terminate (e.g., current date). # Convert events to states # All sequences will end on June 3, 2023 states_pool = events_pool.as_state(end_value=datetime(2023, 6, 3, 0, 0)) states_pool ## 2. State to Event Conversion Extract events from states using the `anchor` parameter: - `"start"`: event at the beginning of each state - `"end"`: event at the end of each state - `"both"`: events at both start and end # Extract start events => back to original events pool events_from_start = states_pool.as_event(anchor="start") events_from_start # Extract end events events_from_end = states_pool.as_event(anchor="end") events_from_end ## 3. State to Interval Conversion State and Interval are structurally equivalent (both have start/end times). The conversion is trivial. # Convert states to intervals intervals_pool = states_pool.as_interval() intervals_pool ## 4. Event to Interval with Duration Convert events to intervals by specifying a duration. Duration can be: - A scalar `timedelta` (fixed duration for all events) - A column name containing duration values - A `DateOffset` for calendar-aware durations ### 4.1 Fixed Duration (timedelta) # Medication events with fixed 6-hour duration medication_data = pd.DataFrame( { "patient_id": [101, 101, 102, 103], "timestamp": [ datetime(2023, 6, 1, 10, 0), datetime(2023, 6, 1, 16, 0), datetime(2023, 6, 1, 12, 0), datetime(2023, 6, 1, 14, 0), ], "medication": ["Antibiotics", "Painkillers", "Antibiotics", "Antibiotics"], } ) medications_pool = EventSequencePool( sequence_data=medication_data, settings={ "id_column": "patient_id", "time_column": "timestamp", "entity_features": ["medication"], }, ) # Convert with fixed duration medication_intervals = medications_pool.as_interval(duration=timedelta(hours=6)) medication_intervals ### 4.2 Variable Duration (column) When each event has its own duration, we recommend storing the duration in a column and declare it as a duration feature. # Procedure events with variable durations procedure_data = pd.DataFrame( { "patient_id": [101, 102, 103], "timestamp": [ datetime(2023, 6, 1, 11, 0), datetime(2023, 6, 1, 13, 0), datetime(2023, 6, 1, 16, 0), ], "procedure": ["X-Ray", "X-Ray", "MRI"], "duration_hours": [1, 1, 2], # Variable durations } ) procedures_pool = EventSequencePool( sequence_data=procedure_data, settings={ "id_column": "patient_id", "time_column": "timestamp", "entity_features": ["procedure", "duration_hours"], }, ) # Declare the duration column procedures_pool.update_entity_metadata( feature_name="duration_hours", feature_type="duration", granularity="hour" ) # Convert using the duration column procedure_intervals = procedures_pool.as_interval(duration="duration_hours") procedure_intervals ### 4.3 Duration in Days For longer durations, we can use days. The duration column should contain numerical values representing the number of days. # Treatment events with durations in days treatment_data = pd.DataFrame( { "patient_id": [101, 102, 103], "start_date": [ datetime(2023, 1, 15), datetime(2023, 2, 28), datetime(2023, 3, 15), ], "treatment": ["Chemotherapy", "Radiotherapy", "Chemotherapy"], "duration_days": [90, 60, 180], # Durations in days } ) treatments_pool = EventSequencePool( sequence_data=treatment_data, settings={ "id_column": "patient_id", "time_column": "start_date", "entity_features": ["treatment", "duration_days"], }, ) # Declare duration with day granularity treatments_pool.update_entity_metadata( feature_name="duration_days", feature_type="duration", granularity="day", ) # Convert with day-based durations treatment_intervals = treatments_pool.as_interval(duration="duration_days") treatment_intervals ### 4.4 Timestep-Based Sequences with UNIT Granularity When working with data when time is encoded as `timestep`, use the `UNIT` granularity for durations. This preserves timesteps as floats without converting to timedelta. # Simulation data with abstract timesteps and fractional durations timestep_data = pd.DataFrame( { "patient_id": [101, 101, 101, 102, 102, 103], "timestep": [0.0, 5.0, 10.0, 0.0, 3.0, 2.0], "event_type": [ "start", "medication", "discharge", "start", "test", "procedure", ], "duration_units": [5.5, 4.5, None, 3.25, 1.75, 2.0], # Float durations } ) timesteps_pool = EventSequencePool( sequence_data=timestep_data, settings={ "id_column": "patient_id", "time_column": "timestep", "entity_features": ["event_type", "duration_units"], }, ) # Declare duration with UNIT granularity (no conversion, preserves floats) timesteps_pool.update_entity_metadata( feature_name="duration_units", feature_type="duration", granularity="unit" ) print("Timestep-based events:") timesteps_pool # Convert to intervals using UNIT durations (float addition, no timedelta) timestep_intervals = timesteps_pool.as_interval(duration="duration_units") print("Timestep-based intervals (floats preserved):") timestep_intervals ## 5. Working with Metadata After conversion, always verify and update metadata as needed. # Check metadata after conversion print("Metadata after Event -> State conversion:") print(states_pool.metadata.describe(verbose=True)) # Update metadata for a categorical feature states_pool.update_entity_metadata( feature_name="location", feature_type="categorical", categories=["Emergency", "ICU", "Ward"], ) print(states_pool.metadata.describe(verbose=True)) ---------------------------------------- ## More details on Pull requests Contributing First of all, thank you for considering contributing to *TanaT*. It is still an experimental toolkit, but it received a warn welcome from various communities that are interested in its functionalities. Contributions are managed through GitLab Issues and Pull Requests. We are welcoming contributions in the following forms: - **Bug reports**: when filing an issue to report a bug, please use the search tool to ensure the bug hasn't been reported yet; - **New feature suggestions**: if you think *TanaT* should include a new algorithm, please open an issue to ask for it (of course, you should always check that the feature has not been asked for yet :). Think about linking to a pdf version of the paper that first proposed the method when suggesting a new algorithm. - **Bug fixes and new feature implementations**: if you feel you can fix a reported bug/implement a suggested feature yourself, do not hesitate to: 1. fork the project; 2. implement your bug fix; 3. submit a pull request referencing the ID of the issue in which the bug was reported / the feature was suggested; If you would like to contribute by implementing a new feature reported in the Issues, maybe starting with Issues that are attached the "good first issue" label would be a good idea. When submitting code, please think about code quality, adding proper docstrings including doctests with high code coverage. More details on Pull requests The preferred workflow for contributing to *TanaT* is to fork the main repository on GitLab, clone, and develop on a branch. Steps: 1. Fork the project repository by clicking on the 'Fork' button near the top right of the page. This creates a copy of the code under your GitHub user account. For more details on how to fork a repository see this guide . 2. Clone your fork of the *TanaT* repo to your local disk:: $ git clone git@github.com:YourLogin/tanat.git $ cd tanat 3. Create a `my-feature` branch to hold your development changes. Always use a `my-feature` branch. It's good practice to never work on the `master` branch:: $ git checkout -b my-feature 4. Develop the feature on your feature branch. To record your changes in git, add changed files using `git add` and then `git commit` files:: $ git add modified_files $ git commit 5. Push the changes with:: $ git push -u origin my-feature 6. Follow these instructions to create a pull request from your fork. This will send an email to the committers. (If any of the above seems like magic to you, please look up the Git documentation on the web, or ask a friend or another contributor for help.) ---------------------------------------- ## Citing Citing *TanaT* If you use *TanaT* in a scientific publication, , please cite: @inproceedings{tanat2025, title={Towards a Library for the Analysis of Temporal Sequences}, authors={Thomas Guyet and Arnaud Duvermy}, booktitle={Proceedings of AALTD, ECML Workshop on Advanced Analytics and Learning on Temporal Data}, year={2025}, pages={16} } ----------------------------------------