Liu/listen.py

93 lines
3.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socket
import threading
class VoltageReceiver():
def __init__(self):
self.data = "" # 数据存储
self.active = False # 控制线程的开关
self.dataready = 0 # 数据是否准备好的标志位
self.thread = None # 线程存储
self.sock = None # 套接字存储
def receive_data(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.bind(('127.0.0.1', 9996))
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.listen(1)
while self.active: # 只有当 self.active 为 True 时,才开始监听
conn, addr = self.sock.accept()
while conn:
try:
data = conn.recv(1024)
except socket.error as e:
print("Remote host has disconnected.") # 打印提醒
break
if not data or not self.active: # 如果没有数据或 self.active 为 False 则跳出循环
break
data = data.decode()
if self.dataready:
# 新的数据已经准备好,而旧的数据还没被取走,发出警告
self.alert()
with threading.Lock(): # 使用锁确保线程安全
self.data = data
self.dataready = 1
conn.close()
def start(self): # 开始监听
if not self.thread or not self.thread.is_alive(): # 如果线程未创建或已结束,那么创建一个新线程
self.active = True
self.thread = threading.Thread(target=self.receive_data)
self.thread.start()
def stop(self): # 停止监听
self.active = False
if self.sock: # 关闭套接字,使得 sock.accept() 退出阻塞
self.sock.close()
def get_data(self): # 获取数据
with threading.Lock(): # 使用锁确保线程安全
self.dataready = 0
return self.data
def alert(self): # 报警函数
print("Warning: New data has arrived but the old data has not been taken away.")
def write_to_file(data, filename):
"""
This function write data to a local .txt file
:param data: str, data to be written to the file
:param filename: str, name of the file
"""
with open(filename, 'w') as file:
file.write(data)
def read_from_file(filename):
"""
This function read data from a local .txt file
:param filename: str, name of the file
:return: str, data read from the file
"""
with open(filename, 'r') as file:
data = file.read().replace('\n', '')
return data
if __name__ == "__main__": # 测试代码
receiver = VoltageReceiver() # 创建一个 VoltageReceiver 的实例
receiver.start() # 开始监听
while receiver.active:
if receiver.dataready: # 当数据准备好时,打印数据
data = read_from_file('test.txt')
if data == '#' or data == '':
print(receiver.get_data())
write_to_file(receiver.get_data(), "test.txt")
else:
pass
receiver.stop() # 停止监听