import configparser from imaplib import IMAP4 import ssl import email import os from glob import glob from email.message import Message from sqlalchemy import create_engine, text import gpxpy config = configparser.ConfigParser() config.read("config.ini") db = create_engine( f"postgresql://{config['db']['username']}:{config['db']['password']}@{config['db']['host']}/{config['db']['database']}" ).connect() mail = IMAP4(host=config["mail"]["host"]) fitotrack_msg_filter = "ALL" def init_database(): with open("init.sql") as f: db.execute("\n".join(f.readlines())) def _get_sender(msg: Message) -> str: sender: str = msg.get("from") if " " in sender: sender = sender.split(" ") for field in sender: if "@" in field and "<" in field and ">" in field: return field[1:-1] return sender def get_gpx_files_from_mail(): mail.starttls(ssl.create_default_context()) mail.login(config["mail"]["username"], config["mail"]["password"]) mail.select() _, ids = mail.search(None, fitotrack_msg_filter) ids = ids[0].split() for i in ids: _, fetched = mail.fetch(i, "(RFC822)") email_message = email.message_from_bytes(fetched[0][1]) sender = _get_sender(email_message) for part in email_message.walk(): if ( part.get_content_maintype() == "multipart" or part.get_content_disposition() is None ): continue filename = part.get_filename() if filename: filename = f"{sender}_{filename}" if not os.path.exists(f"gpx_files/{filename}"): with open(f"gpx_files/{filename}", "wb") as f: print(f"creating {filename}") f.write(part.get_payload(decode=True)) mail.store(i, "+FLAGS", "\\Deleted") mail.expunge() mail.close() mail.logout() def process_gpx_files(): for filepath in glob("gpx_files/*.gpx"): owner = os.path.split(filepath)[-1].split("_workout-")[0] filename = f'workout-{os.path.split(filepath)[-1].split("_workout-")[1]}' print(f"Processing {filename}") if list( db.execute( text( "select exists(select from training where owner = :owner and filename = :filename)" ), dict( owner=owner, filename=filename, ), ), )[0][0]: os.remove(filepath) continue with open(filepath) as f: gpx_file = gpxpy.parse(f) if gpx_file.creator != "FitoTrack": raise ValueError("gpx file not generated by the FitoTrack app") training_id = list( db.execute( text( """ insert into training(owner, filename, type, description, moving_time, stopped_time, moving_distance, stopped_distance) values (:owner, :filename, :type, :description, :moving_time, :stopped_time, :moving_distance, :stopped_distance) returning id """ ), dict( owner=owner, filename=filename, type="cycling", # TODO other training types description=gpx_file.description, moving_time=gpx_file.get_moving_data().moving_time, stopped_time=gpx_file.get_moving_data().stopped_time, moving_distance=gpx_file.get_moving_data().moving_distance, stopped_distance=gpx_file.get_moving_data().stopped_distance, ), ) )[0][0] for track in gpx_file.tracks: for segment in track.segments: for point in segment.points: db.execute( text( """ insert into training_data(training_id, t, geog, speed, elevation) values (:training_id, :t, :geog, :speed, :elevation) """ ), dict( training_id=training_id, t=point.time, geog=f"POINT({point.latitude} {point.longitude})", speed=point.speed, elevation=point.elevation, ), ) os.remove(filepath) def main(): try: os.mkdir("gpx_files") except FileExistsError: pass init_database() get_gpx_files_from_mail() with db.begin(): process_gpx_files() if __name__ == "__main__": try: main() except (KeyboardInterrupt, EOFError): pass