Лаб. работа “Организация потоков, параллельной обработки, синхронизации и распределённой обработки синхронизуемых участков кода”
Цель работы
Реализовать паттерн «Производитель-потребитель» для асинхронной обработки сетевых пакетов. Организовать потокобезопасную очередь с использованием QMutex и QWaitCondition для синхронизации доступа со стороны потоков-производителей и потоков-потребителей.
Теоретические сведения
Потоки в Qt
QThread — класс Qt для создания и управления потоками выполнения. Два основных подхода:
- Наследование от QThread — переопределение метода
run() - Перемещение QObject в поток — через
moveToThread()
Синхронизация: QMutex
QMutex — мьютекс для взаимного исключения. Гарантирует, что только один поток одновременно может выполнять защищённый участок кода.
Ожидание и уведомление: QWaitCondition
QWaitCondition позволяет потокам ожидать определённого условия и быть разбуженными другими потоками.
QMutex mutex;
QWaitCondition condition;
bool dataReady = false;
// Поток-потребитель
void consumer() {
mutex.lock();
while (!dataReady)
condition.wait(&mutex); // ожидание, мьютекс освобождается
// обработка данных
dataReady = false;
mutex.unlock();
}
// Поток-производитель
void producer() {
mutex.lock();
// добавление данных
dataReady = true;
condition.wakeAll(); // уведомление ожидающих потоков
mutex.unlock();
}Паттерн «Производитель-потребитель»
Конфигурация CMake для Qt
Задание для выполнения
Реализовать многопоточное приложение, моделирующее обработку сетевых пакетов по паттерну «Производитель-потребитель».
Требования к программе
- Создать класс
NetworkPacket:- поля:
sourceIp(string),destIp(string),payload(string),timestamp(QString),packetId(int, автоинкрементный) - метод
toString()— строковое представление пакета
- поля:
- Создать класс
ThreadSafeQueue:- закрытые поля:
queue<NetworkPacket>,QMutex,QWaitCondition,bool running - метод
push(const NetworkPacket&)— добавить пакет с уведомлением потребителей (wakeOne()) - метод
pop()— извлечь пакет (ожидание, если очередь пуста, черезwait()) - метод
size()— текущий размер очереди (под мьютексом) - метод
stop()— установитьrunning = falseиwakeAll()для завершения ожидающих потоков - метод
isEmpty()— проверка пустоты очереди
- закрытые поля:
- Создать класс
Producer(наследникQThread):- закрытые поля:
ThreadSafeQueue& queue,string ip,int packetsCount,int intervalMs - метод
run()— генерируетpacketsCountпакетов с указанным IP-адресом источника, случайным IP-адресом получателя и случайной нагрузкой, добавляет их в очередь с задержкойintervalMs. Выводит в консоль[Producer <ip>] Отправлен пакет #<id>.
- закрытые поля:
- Создать класс
Consumer(наследникQThread):- закрытые поля:
ThreadSafeQueue& queue,string name,int processedCount - метод
run()— в цикле извлекает пакеты из очереди и «обрабатывает» их (имитация: задержка 50–200 мс, вывод[Consumer <name>] Обработан пакет #<id> от <sourceIp> → <destIp>). Завершается приstop()очереди и её опустошении. - метод
getProcessedCount()— количество обработанных пакетов
- закрытые поля:
- В функции
main():- Создать общую очередь
ThreadSafeQueue - Создать 3 потока-производителя (разные IP, по 10 пакетов каждый, интервал 100 мс)
- Создать 2 потока-потребителя
- Запустить все потоки
- Дождаться завершения производителей (
wait()) - После завершения производителей вызвать
queue.stop() - Дождаться завершения потребителей
- Вывести статистику: сколько пакетов обработал каждый потребитель
- Создать общую очередь
Пример ожидаемого вывода
=== Запуск системы обработки пакетов ===
[Producer 192.168.1.1] Отправлен пакет #1
[Producer 10.0.0.5] Отправлен пакет #2
[Producer 172.16.0.1] Отправлен пакет #3
[Consumer Worker-1] Обработан пакет #1 от 192.168.1.1 → 10.0.0.2
[Consumer Worker-2] Обработан пакет #2 от 10.0.0.5 → 192.168.1.3
[Producer 192.168.1.1] Отправлен пакет #4
...
=== Все производители завершили работу ===
[Consumer Worker-1] Обработан пакет #29 от 172.16.0.1 → 10.0.0.1
[Consumer Worker-2] Обработан пакет #30 от 192.168.1.1 → 172.16.0.2
=== Статистика ===
Сгенерировано пакетов: 30
Worker-1 обработал: 16 пакетов
Worker-2 обработал: 14 пакетов
Индивидуальные задания
Вариант 1. Обработка DNS-запросов
Элемент очереди: DNSQuery (домен, тип записи (A/AAAA/CNAME/MX), IP-клиента, queryId — автоинкрементный). Метод toString(): [DNS] #<id> <domain> <type> от <clientIP>.
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): DNS-резолверы (каждый генерирует запросы для своего набора доменов с интервалом 100 мс, по 10 запросов). Вывод: [Resolver <name>] DNS-запрос #<id>: <домен> (<тип>) от <IP>.
Потребители (2): DNS-сервера (обрабатывают запрос — имитация задержки 50–150 мс, вывод [Server-<name>] Разрешено: <domain> → <IP>).
Статистика: запросов обработано каждым сервером.
Вариант 2. Обработка сетевых пакетов
Элемент очереди: NetworkPacket (sourceIp, destIp, payload, timestamp, packetId)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): генерируют пакеты с указанным IP-источником, случайным IP-получателем и случайной нагрузкой, добавляют в очередь с задержкой. Вывод: [Producer <ip>] Отправлен пакет #<id>.
Потребители (2): извлекают пакеты из очереди, имитируют обработку (задержка 50–200 мс). Вывод: [Consumer <name>] Обработан пакет #<id> от <sourceIp> → <destIp>.
Статистика: сколько пакетов обработал каждый потребитель.
Вариант 3. Обработка HTTP-запросов
Элемент очереди: HTTPRequest (URL, метод (GET/POST/PUT/DELETE), IP-клиента, размер, requestId)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): веб-клиенты, генерируют запросы со случайными URL и методами. Вывод: [Client <name>] HTTP-запрос #<id>: <метод> <URL>.
Потребители (2): HTTP-сервера, обрабатывают запрос — генерируют код ответа (200 с вероятностью 80 %, 404 с вероятностью 20 %). Вывод: [Server <name>] Запрос #<id> → <код ответа>.
Статистика: запросов по методам (GET, POST, PUT, DELETE).
Вариант 4. Обработка почтовых сообщений
Элемент очереди: EmailMessage (от, кому, тема, размер, messageId)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): почтовые клиенты, генерируют письма со случайными отправителями, получателями и темами. Вывод: [Client <name>] Письмо #<id>: от <от> к <кому> — <тема>.
Потребители (2): SMTP-сервера, обрабатывают — отправляют (90 %) или отклоняют (10 %, если размер > 10 МБ). Вывод: [SMTPServer <name>] Письмо #<id>: отправлено / отклонено.
Статистика: писем обработано каждым сервером.
Вариант 5. Обработка IoT-данных
Элемент очереди: SensorData (ID устройства, тип датчика (temperature/humidity/motion), значение, timestamp)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): группы датчиков — producer-1 (температура, 15–35 °C), producer-2 (влажность, 30–90 %), producer-3 (движение, 0/1). Вывод: [Sensor <type>] Данные #<id>: устройство=<ID>, значение=<val>.
Потребители (2): обработчики — фильтруют аномальные значения, сохраняют. Вывод: [Handler <name>] Обработаны данные #<id>: <тип>=<val> → сохранено/фильтровано.
Статистика: показаний по типам датчиков.
Вариант 6. Обработка логов
Элемент очереди: LogEntry (уровень (DEBUG/INFO/WARNING/ERROR), источник, сообщение, timestamp)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): модули приложения — web, db, auth — генерируют логи разных уровней. Вывод: [<module>] Log #<id>: [<уровень>] <сообщение>.
Потребители (2): writer’а — writer-1 записывает ERROR/WARNING в errors.log, writer-2 записывает INFO/DEBUG в app.log. Вывод: [Writer <name>] Записан лог #<id> в <файл>.
Статистика: записей по уровням логирования.
Вариант 7. Обработка транзакций
Элемент очереди: Transaction (ID, тип (debit/credit), сумма, аккаунт)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): терминалы оплаты, генерируют транзакции со случайными суммами (100–50000) и типами. Вывод: [Terminal <name>] Транзакция #<id>: <тип> <сумма> ₽, аккаунт <аккаунт>.
Потребители (2): обработчики платежей — выполняют транзакцию, проверяют баланс. Вывод: [Processor <name>] Транзакция #<id>: выполнена / отказано.
Статистика: сумма по типам транзакций (debit/credit).
Вариант 8. Обработка видеопотоков
Элемент очереди: VideoFrame (ID камеры, разрешение, размер (байт), timestamp, frameId)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): IP-камеры, генерируют кадры с различным разрешением (720p/1080p/4K). Вывод: [Camera <id>] Кадр #<frameId>: <разрешение>, <размер> байт.
Потребители (2): обработчики — сжимают и сохраняют кадры. Вывод: [Processor <name>] Кадр #<frameId> камеры <id> обработан (<разрешение>).
Статистика: кадров с каждой камеры.
Вариант 9. Обработка файловых операций
Элемент очереди: FileOperation (имя файла, тип (read/write/delete), размер, opId)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): FTP-клиенты, генерируют файловые операции со случайными именами файлов и типами. Вывод: [FTPClient <name>] Операция #<id>: <тип> <файл> (<размер> КБ).
Потребители (2): файловые сервера, выполняют операции. Вывод: [FileServer <name>] Операция #<id>: <тип> <файл> — выполнена / ошибка.
Статистика: операций по типам (read/write/delete).
Вариант 10. Обработка API-запросов
Элемент очереди: APIRequest (endpoint, метод (GET/POST/PUT/DELETE), параметры (map<string,string>), IP-клиента, requestId)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): микросервисы (auth-service, order-service, user-service), генерируют запросы к различным endpoint’ам. Вывод: [Service <name>] API-запрос #<id>: <метод> <endpoint>.
Потребители (2): API-шлюза, маршрутизируют и обрабатывают запросы. Вывод: [Gateway <name>] Запрос #<id>: <endpoint> → 200 / 404 / 500.
Статистика: запросов по endpoint.
Вариант 11. Обработка событий безопасности
Элемент очереди: SecurityEvent (IP-источник, тип (intrusion/scan/DDoS), уровень угрозы (low/medium/high/critical), timestamp)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): IDS-сенсоры (firewall, waf, honeypot), генерируют события безопасности. Вывод: [Sensor <name>] Событие #<id>: <тип> от <IP>, угроза: <уровень>.
Потребители (2): анализаторы — фильтруют события (пропускают medium+), оповещают. Вывод: [Analyzer <name>] Событие #<id>: <тип> — оповещение / проигнорировано.
Статистика: событий по типам (intrusion/scan/DDoS).
Вариант 12. Обработка чат-сообщений
Элемент очереди: ChatMessage (отправитель, получатель (или канал), текст, timestamp, msgId)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): комнаты чата (#general, #dev, #random), генерируют сообщения от случайных пользователей. Вывод: [Room <name>] Сообщение #<id>: <отправитель> → <текст>.
Потребители (2): обработчики — handler-1 отвечает за логирование, handler-2 за маршрутизацию. Вывод: [Handler <name>] Сообщение #<id> из <room>: залогировано / маршрутизировано.
Статистика: сообщений по комнатам.
Вариант 13. Обработка заданий печати
Элемент очереди: PrintJob (документ, принтер, страницы, приоритет (low/normal/high), jobId)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): отделы (бухгалтерия, IT, менеджмент), генерируют задания печати с разными приоритетами. Вывод: [Dept <name>] Задание #<id>: «<документ>», <страницы> стр., приоритет: <приоритет>.
Потребители (2): принт-сервера, обрабатывают задания. Вывод: [PrintServer <name>] Задание #<id>: «<документ>» напечатано (<страницы> стр.).
Статистика: страниц на каждый сервер.
Вариант 14. Обработка метрик мониторинга
Элемент очереди: Metric (имя метрики, значение (double), хост, timestamp)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): агенты мониторинга (server-1, server-2, server-3), генерируют метрики (CPU, RAM, Disk, Network). Вывод: [Agent <host>] Метрика: <имя>=<значение>.
Потребители (2): хранилища — storage-1 выполняет агрегацию, storage-2 проверяет пороги и генерирует алерты. Вывод: [Storage <name>] Метрика <имя> хоста <host>: сохранена / алерт!.
Статистика: метрик по хостам.
Вариант 15. Обработка резервных копий
Элемент очереди: BackupTask (источник, назначение, размер (МБ), тип (full/incremental), taskId)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): файловые сервера, генерируют задачи бэкапа с разным типом и размером. Вывод: [Server <name>] Задача #<id>: <источник> → <назначение>, <тип>, <размер> МБ.
Потребители (2): backup-сервера, выполняют копирование. Вывод: [Backup <name>] Задача #<id>: <тип> бэкап <источник> завершён (<размер> МБ).
Статистика: объём данных по типам (full/incremental).
Вариант 16. Обработка VPN-туннелей
Элемент очереди: VPNTunnel (клиент, сервер, протокол (OpenVPN/WireGuard/IPSec), объём трафика (МБ))
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): VPN-шлюза (gateway-1, gateway-2, gateway-3), генерируют записи о туннелях с различным трафиком. Вывод: [Gateway <name>] Туннель: <клиент> ↔︎ <сервер>, <протокол>, <объём> МБ.
Потребители (2): монитора трафика, анализируют записи. Вывод: [Monitor <name>] Туннель <клиент>→<сервер>: <объём> МБ — OK / превышен лимит.
Статистика: трафик по протоколам.
Вариант 17. Обработка SSH-команд
Элемент очереди: SSHCommand (хост, пользователь, команда, статус (pending/executed/failed))
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): администратора (admin-1, admin-2, admin-3), генерируют команды для различных хостов. Вывод: [Admin <name>] Команда #<id>: <пользователь>@<хост> $ <команда>.
Потребители (2): SSH-сервера, выполняют команды. Вывод: [SSHServer <name>] Команда #<id> на <хост>: <команда> — выполнена / ошибка.
Статистика: команд по серверам (хостам).
Вариант 18. Обработка CDN-запросов
Элемент очереди: CDNRequest (URL, клиентский IP, edge-сервер, размер (КБ), requestId)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): edge-сервера (edge-eu, edge-us, edge-asia), генерируют запросы к origin. Вывод: [Edge <name>] Запрос #<id>: <URL> от <IP>, <размер> КБ.
Потребители (2): origin-сервера, обслуживают запросы (кэш–hit 70 % / cache-miss 30 %). Вывод: [Origin <name>] Запрос #<id>: <URL> — кэш-hit / cache-miss (передан клиенту).
Статистика: запросов по edge-серверам.
Вариант 19. Обработка WebSocket-фреймов
Элемент очереди: WSFrame (сессия, тип (text/binary/close), данные, размер (байт), frameId)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): WebSocket-сервера (ws-chat, ws-stream, ws-api), генерируют фреймы различных типов. Вывод: [WSServer <name>] Фрейм #<id>: сессия=<сессия>, тип=<тип>, <размер> байт.
Потребители (2): обработчика фреймов — handler-1 обрабатывает text, handler-2 обрабатывает binary/close. Вывод: [Handler <name>] Фрейм #<id>: <тип> — обработан.
Статистика: фреймов по типам (text/binary/close).
Вариант 20. Обработка задач CI/CD
Элемент очереди: CITask (проект, тип (build/test/deploy), статус (pending/running/done/failed), ветка, taskId)
Класс ThreadSafeQueue: QMutex + QWaitCondition, методы push(), pop(), size(), stop(), isEmpty()
Производители (3): репозитория (frontend, backend, infra), генерируют задачи CI/CD при «коммите». Вывод: [Repo <name>] Задача #<id>: <проект>/<ветка> — <тип>.
Потребители (2): CI-runner’а, выполняют задачи. Вывод: [Runner <name>] Задача #<id>: <проект> <тип> — выполнена / ошибка.
Статистика: задач по типам (build/test/deploy).
Порядок выполнения
- Создать проект C++ в Qt Creator (CMake), подключить модуль
Qt6::Core. - Реализовать классы
NetworkPacketиThreadSafeQueue. - Реализовать классы
ProducerиConsumerкак наследникиQThread. - В
main()создать и запустить все потоки, обеспечить корректное завершение. - Протестировать: убедиться, что все 30 пакетов обработаны, ни один не потерян.
- Скомпилировать, запустить несколько раз (результаты обработки могут отличаться — это нормально).
- Сохранить скриншоты.
Контрольные вопросы
- Что такое поток (thread) и чем он отличается от процесса?
- Какие проблемы могут возникнуть при одновременном доступе нескольких потоков к общим данным?
- Что такое мьютекс (mutex)? Как он решает проблему гонки данных (race condition)?
- Зачем нужен
QMutexLockerи какие преимущества он даёт по сравнению с ручным вызовомlock()/unlock()? - Что такое
QWaitCondition? В чём разница междуwait(),wakeOne()иwakeAll()? - Почему в методе
pop()используется циклwhile (!dataReady)вместоif (!dataReady)? - Что такое ложное пробуждение (spurious wakeup)?
- В каком порядке следует завершать потоки-производители и потоки-потребители? Почему?
- Как бы изменилась программа при добавлении ограничения на максимальный размер очереди?
Содержание отчёта
- Тема, цель и задание лабораторной работы.
- Схема взаимодействия потоков (производители → очередь → потребители).
- Текст программы с комментариями.
- Скриншоты результата выполнения (запуск, обработка, статистика).
- Ответы на контрольные вопросы.
