Загрузка...

The server part stops responding to requests after 1-2 days without interruption

Thread in Python created by Vladeristo Dec 5, 2024. 175 views

  1. Vladeristo
    Vladeristo Topic starter Dec 5, 2024 1 Jan 4, 2018
    Переснает отвечать на запросы клиента спустя 1-2 суток безпрерывной работы. Ошибку не какую не выдает. В чем может быть причина?
    Иногда помогает просто прожатый Enter на клавиатуре. А иногда нет :finger_down:



    Code
    from flask import Flask, request, jsonify
    import json
    import os
    import threading
    import logging
    from datetime import datetime, timedelta
    from waitress import serve

    app = Flask(name)

    DATA_FILE = 'data.json'
    LOG_FILE = 'access.log'
    PASSWORDS_FILE = 'passwords.txt'
    lock = threading.Lock()

    # Настройка логирования с цветами
    class CustomFormatter(logging.Formatter):
    # ANSI escape sequences для цветов
    COLOR_CODES = {
    'INFO': '\033[32m', # Зеленый
    'WARNING': '\033[33m', # Желтый
    'ERROR': '\033[31m', # Красный
    'RESET': '\033[0m', # Сброс цвета
    }

    def format(self, record):
    color = self.COLOR_CODES.get(record.levelname, self.COLOR_CODES['RESET'])
    record.msg = f"{color}{record.msg}{self.COLOR_CODES['RESET']}"
    return super().format(record)

    # Настройка *****
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(CustomFormatter('%(asctime)s - %(message)s'))
    file_handler = logging.FileHandler(LOG_FILE)
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)

    def load_data():
    with lock:
    if os.path.exists(DATA_FILE):
    with open(DATA_FILE, 'r') as f:
    try:
    return json.load(f)
    except json.JSONDecodeError:
    return {}
    return {}

    def save_data(data):
    with lock:
    with open(DATA_FILE, 'w') as f:
    json.dump(data, f)
    save_passwords_to_file(data)

    def save_passwords_to_file(data):
    """Сохранение паролей и MAC-адресов в текстовый файл"""
    with open(PASSWORDS_FILE, 'w') as f:
    if 'passwords' in data:
    for password, details in data['passwords'].items():
    expires_at = details['expires_at'] if details['expires_at'] else "Never"
    f.write(f"Password: {password}, MAC Address: {details['mac']}, Expires At: {expires_at}\n")

    def remove_expired_passwords():
    """Фоновая задача для удаления просроченных паролей"""
    while True:
    data = load_data()
    current_time = datetime.now()
    changed = False

    if 'passwords' in data:
    for password in list(data['passwords']):
    expires_at = data['passwords'][password]['expires_at']
    if expires_at and datetime.strptime(expires_at, '%Y-%m-%d %H:%M:%S') < current_time:
    del data['passwords'][password]
    logger.info(f"Password '{password}' expired and was deleted.")
    changed = True

    if changed:
    save_data(data)

    # Проверка каждые 60 секунд
    threading.Event().wait(60)

    @app.route('/verify', methods=['POST'])
    def verify():
    data = load_data()
    req_data = request.get_json()
    mac_address = req_data.get('mac_address', 'Unknown')
    password = req_data.get('password', 'None')
    client_ip = request.remote_addr
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    def log_event(success, reason):
    status = "Successful" if success else "Failed"
    color = '\033[32m' if success else '\033[31m' # Зеленый для успеха, красный для отказа
    log_message = (
    f"{color}Time: {current_time}, "
    f"IP: {client_ip}, "
    f"Password: {password}, "
    f"MAC Address: {mac_address}, "
    f"Status: {status}, "
    f"Reason: {reason}\033[0m" # Сброс цвета
    )
    logger.info(log_message)

    if 'passwords' not in data:
    log_event(False, "No passwords set on server")
    return jsonify({'access_granted': False, 'error': 'No passwords set on server'})
    if password in data['passwords']:
    expires_at = data['passwords'][password]['expires_at']
    if expires_at and datetime.strptime(expires_at, '%Y-%m-%d %H:%M:%S') < datetime.now():
    del data['passwords'][password]
    save_data(data)
    log_event(False, "Password expired")
    return jsonify({'access_granted': False, 'error': 'Password expired'})

    if data['passwords'][password]['mac'] is None:
    data['passwords'][password]['mac'] = mac_address
    save_data(data)
    log_event(True, "Password registered with MAC address")
    return jsonify({'access_granted': True})
    elif data['passwords'][password]['mac'] == mac_address:
    log_event(True, "Access granted")
    return jsonify({'access_granted': True})
    else:
    log_event(False, "Password already used by another device")
    return jsonify({'access_granted': False, 'error': 'Password already used by another device'})
    else:
    log_event(False, "Invalid password")
    return jsonify({'access_granted': False, 'error': 'Invalid password'})

    def add_password(new_password, duration_days=None):
    data = load_data()
    if 'passwords' not in data:
    data['passwords'] = {}

    expires_at = (datetime.now() + timedelta(days=duration_days)).strftime(
    '%Y-%m-%d %H:%M:%S') if duration_days else None
    data['passwords'][new_password] = {'mac': None, 'expires_at': expires_at}
    save_data(data)
    expiration_message = f"Expires at: {expires_at}" if expires_at else "No expiration"
    print(f"Password '{new_password}' added successfully. {expiration_message}")

    def delete_password(password):
    data = load_data()
    if 'passwords' in data and password in data['passwords']:
    del data['passwords'][password]
    save_data(data)
    print(f"Password '{password}' and associated MAC address deleted successfully.")
    else:
    print(f"Password '{password}' not found.")

    def list_passwords():
    data = load_data()
    if 'passwords' in data:
    for password, details in data['passwords'].items():
    expires_at = details['expires_at'] if details['expires_at'] else "Never"
    print(f"Password: {password}, MAC Address: {details['mac']}, Expires At: {expires_at}")
    else:
    print("No passwords found.")

    def command_listener():
    while True:
    command = input("Enter command: ").strip()
    if command.startswith("add "):
    parts = command.split(" ")
    if len(parts) == 3 and parts[2].endswith('d'):
    duration_days = int(parts[2][:-1])
    add_password(parts[1].strip(), duration_days)
    else:
    add_password(parts[1].strip())
    elif command.startswith("del "):
    _, password_to_delete = command.split(" ", 1)
    delete_password(password_to_delete.strip())
    elif command == "list":
    list_passwords()
    else:
    print("Unknown command")

    if name == 'main':
    threading.Thread(target=command_listener, daemon=True).start()
    threading.Thread(target=remove_expired_passwords, daemon=True).start()
    app.run(host='*************ip адрес', port=5000, ssl_context=('cert.pem', 'key.pem'))
     
  2. ItsJustSpike
    ItsJustSpike Dec 5, 2024 Боты боты.... https://lolz.live/threads/7525029/ 240 Dec 5, 2020
    **** вставь, уверен какие-то хоть да есть в этот момент, может и поможет кто
     
Top
Loading...