198 lines
5.8 KiB
Python
198 lines
5.8 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import unicodedata
|
|
from dataclasses import dataclass
|
|
from datetime import date, timedelta
|
|
from pathlib import Path
|
|
|
|
from .config import ROOT, client_dir
|
|
from .history import now_local
|
|
from .table import print_table
|
|
|
|
|
|
GLOBAL_REMINDERS_PATH = ROOT / "reminders.jsonl"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Reminder:
|
|
id: str
|
|
created_at: str
|
|
due_date: str
|
|
text: str
|
|
client: str
|
|
status: str
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> "Reminder":
|
|
return cls(
|
|
id=str(data.get("id", "")),
|
|
created_at=str(data.get("created_at", "")),
|
|
due_date=str(data.get("due_date", "")),
|
|
text=str(data.get("text", "")),
|
|
client=str(data.get("client", "")),
|
|
status=str(data.get("status", "active")),
|
|
)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"id": self.id,
|
|
"created_at": self.created_at,
|
|
"due_date": self.due_date,
|
|
"text": self.text,
|
|
"client": self.client,
|
|
"status": self.status,
|
|
}
|
|
|
|
|
|
def normalize_text(value: str) -> str:
|
|
normalized = unicodedata.normalize("NFKD", value)
|
|
without_marks = "".join(ch for ch in normalized if not unicodedata.combining(ch))
|
|
return without_marks.lower()
|
|
|
|
|
|
def reminder_path(domain: str | None = None) -> Path:
|
|
if domain:
|
|
base = client_dir(domain)
|
|
return base / "reminders.jsonl"
|
|
return GLOBAL_REMINDERS_PATH
|
|
|
|
|
|
def load_reminders(domain: str | None = None, include_global: bool = False) -> list[Reminder]:
|
|
paths = []
|
|
if include_global:
|
|
paths.append(GLOBAL_REMINDERS_PATH)
|
|
paths.append(reminder_path(domain))
|
|
reminders: list[Reminder] = []
|
|
for path in paths:
|
|
if not path.exists():
|
|
continue
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
reminders.append(Reminder.from_dict(json.loads(line)))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
reminders.sort(key=lambda item: (item.due_date, item.created_at, item.id))
|
|
return reminders
|
|
|
|
|
|
def parse_reminder_text(raw_text: str) -> tuple[date, str]:
|
|
text = " ".join(raw_text.split()).strip()
|
|
if not text:
|
|
raise ValueError("Brak tresci przypomnienia.")
|
|
|
|
today = now_local().date()
|
|
normalized = normalize_text(text)
|
|
due = today
|
|
matched_prefix = ""
|
|
|
|
patterns = [
|
|
(r"^za\s+(\d+)\s+dni(?:\s+|$)", "days"),
|
|
(r"^za\s+(\d+)\s+dzien(?:\s+|$)", "days"),
|
|
(r"^za\s+(\d+)\s+tygodnie(?:\s+|$)", "weeks"),
|
|
(r"^za\s+(\d+)\s+tygodni(?:\s+|$)", "weeks"),
|
|
(r"^za\s+(\d+)\s+tydzien(?:\s+|$)", "weeks"),
|
|
(r"^za\s+(\d+)\s+miesiace(?:\s+|$)", "months"),
|
|
(r"^za\s+(\d+)\s+miesiecy(?:\s+|$)", "months"),
|
|
(r"^za\s+(\d+)\s+miesiac(?:\s+|$)", "months"),
|
|
]
|
|
for pattern, unit in patterns:
|
|
match = re.search(pattern, normalized)
|
|
if not match:
|
|
continue
|
|
amount = int(match.group(1))
|
|
if unit == "days":
|
|
due = today + timedelta(days=amount)
|
|
elif unit == "weeks":
|
|
due = today + timedelta(weeks=amount)
|
|
else:
|
|
due = today + timedelta(days=amount * 30)
|
|
matched_prefix = text[: match.end()].strip()
|
|
break
|
|
|
|
if not matched_prefix:
|
|
if normalized.startswith("jutro"):
|
|
due = today + timedelta(days=1)
|
|
matched_prefix = text[:5].strip()
|
|
elif normalized.startswith("dzisiaj"):
|
|
due = today
|
|
matched_prefix = text[:7].strip()
|
|
|
|
note = text[len(matched_prefix) :].strip() if matched_prefix else text
|
|
note_normalized = normalize_text(note)
|
|
for prefix in [
|
|
"przypomnij mi o ",
|
|
"przypomnij o ",
|
|
"przypomnienie o ",
|
|
"o ",
|
|
]:
|
|
if note_normalized.startswith(prefix):
|
|
note = note[len(prefix) :].strip()
|
|
break
|
|
if not note:
|
|
note = text
|
|
return due, note
|
|
|
|
|
|
def add_reminder(raw_text: str, domain: str | None = None) -> Reminder:
|
|
due, note = parse_reminder_text(raw_text)
|
|
ts = now_local()
|
|
reminder = Reminder(
|
|
id=f"rem_{ts.strftime('%Y%m%d%H%M%S')}",
|
|
created_at=ts.isoformat(timespec="seconds"),
|
|
due_date=due.isoformat(),
|
|
text=note,
|
|
client=domain or "",
|
|
status="active",
|
|
)
|
|
path = reminder_path(domain)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("a", encoding="utf-8") as f:
|
|
f.write(json.dumps(reminder.to_dict(), ensure_ascii=False) + "\n")
|
|
return reminder
|
|
|
|
|
|
def active_reminders_for_client(domain: str) -> list[Reminder]:
|
|
return [item for item in load_reminders(domain, include_global=True) if item.status == "active"]
|
|
|
|
|
|
def reminder_status_label(reminder: Reminder) -> str:
|
|
today = now_local().date()
|
|
try:
|
|
due = date.fromisoformat(reminder.due_date)
|
|
except ValueError:
|
|
return "brak daty"
|
|
days = (due - today).days
|
|
if days < 0:
|
|
return f"po terminie {abs(days)} dni"
|
|
if days == 0:
|
|
return "dzisiaj"
|
|
if days == 1:
|
|
return "jutro"
|
|
return f"za {days} dni"
|
|
|
|
|
|
def print_client_reminders(domain: str, limit: int = 12) -> None:
|
|
reminders = active_reminders_for_client(domain)
|
|
if not reminders:
|
|
return
|
|
print("\nPrzypomnienia")
|
|
shown = reminders[:limit]
|
|
print_table(
|
|
["Termin", "Status", "Zakres", "Notatka"],
|
|
[
|
|
[
|
|
reminder.due_date,
|
|
reminder_status_label(reminder),
|
|
reminder.client or "globalne",
|
|
reminder.text,
|
|
]
|
|
for reminder in shown
|
|
],
|
|
)
|
|
if len(reminders) > len(shown):
|
|
print(f"... oraz {len(reminders) - len(shown)} kolejnych przypomnien")
|