前段时间有个需求,项目上有两个不同的进程,一个是API接口进程,一个是内部服务的进程。两个进程共用一个白名单数据库,内部服务进程要处理大量的数据(数亿级),每次处理一行数据都要检测一下是否在白名单内,因此要做一个白名单缓存来加速运行。为了实现白名单同步,要求当API接收到修改白名单请求的时候,通知一下内部服务进程,刷新白名单缓存。
直接上代码:
import socket
from threading import Thread
from time import sleep
# 当前存在问题:
# 1,UDP无法保证数据包交付
# 2,UDP监听失败不能重试
class Client:
def __init__(self, port=8742):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.host, self.port = "127.0.0.1", port
def send_notify(self, msg):
"""
发送通知
:param msg:通知内容
:return:
"""
self.serv.sendto(msg.encode(), (self.host, self.port))
class Server:
def __init__(self, callback, port=8742):
self.host, self.port = "127.0.0.1", port
self.thread_stop_sign = False
self.callback = callback
def server_handle(self, callback):
"""
UDP处理函数,收到UDP报文时会调callback函数
:param callback:回调函数
:return:
"""
serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
serv.bind((self.host, self.port))
while True:
message, client_address = serv.recvfrom(2048)
message = message.decode()
if message == "stop" and self.thread_stop_sign:
self.thread_stop_sign = False
serv.close()
return
callback(message)
def start(self):
"""
守护线程启动
:return:线程成功启动,返回True,反之返回false
"""
self.thread = Thread(target=self.server_handle, args=(self.callback,))
self.thread.setDaemon(True)
self.thread.start()
def stop(self):
"""
守护线程启动
:return:线程关闭
"""
self.thread_stop_sign = True
serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
serv.sendto(b"stop", (self.host, self.port))
这样,在内部服务进程开一个UDP服务器,并设置好回调函数
def func(s):
func_table = {
"aaa": aaa,
"bbb": bbb,
... # 这里是个函数表,接受到某个字段就执行某个函数
}
if s in func_table.keys():
func_table[s]()
server = Server()
server.start(callback = func)
然后在api接口的相关地方开一个Client
client = Client()
即可进行通讯
client.send_notify("aaa") # 通知server执行aaa()
这里因为仅仅是简单的刷新白名单的需求,没有做队列,如有需要,可以修改server的代码,使用python的queue实现一个消息队列。