Add instance name to command

This commit is contained in:
Erik Johnston 2020-03-25 10:17:29 +00:00
parent 9ea391054f
commit f6e7daaac3
3 changed files with 17 additions and 11 deletions

View file

@ -86,7 +86,7 @@ class RdataCommand(Command):
Format::
RDATA <stream_name> <token> <row_json>
RDATA <stream_name> <instance_name> <token> <row_json>
The `<token>` may either be a numeric stream id OR "batch". The latter case
is used to support sending multiple updates with the same stream ID. This
@ -107,22 +107,27 @@ class RdataCommand(Command):
NAME = "RDATA"
def __init__(self, stream_name, token, row):
def __init__(self, stream_name, instance_name, token, row):
self.stream_name = stream_name
self.instance_name = instance_name
self.token = token
self.row = row
@classmethod
def from_line(cls, line):
stream_name, token, row_json = line.split(" ", 2)
stream_name, instance_name, token, row_json = line.split(" ", 3)
return cls(
stream_name, None if token == "batch" else int(token), json.loads(row_json)
stream_name,
instance_name,
None if token == "batch" else int(token),
json.loads(row_json),
)
def to_line(self):
return " ".join(
(
self.stream_name,
self.instance_name,
str(self.token) if self.token is not None else "batch",
_json_encoder.encode(self.row),
)
@ -142,17 +147,18 @@ class PositionCommand(Command):
NAME = "POSITION"
def __init__(self, stream_name, token):
def __init__(self, stream_name, instance_name, token):
self.stream_name = stream_name
self.instance_name = instance_name
self.token = token
@classmethod
def from_line(cls, line):
stream_name, token = line.split(" ", 1)
return cls(stream_name, int(token))
stream_name, instance_name, token = line.split(" ", 2)
return cls(stream_name, instance_name, int(token))
def to_line(self):
return " ".join((self.stream_name, str(self.token)))
return " ".join((self.stream_name, self.instance_name, str(self.token)))
class ErrorCommand(Command):

View file

@ -134,7 +134,7 @@ class ReplicationClientHandler:
for stream_name, stream in self.streams.items():
current_token = stream.current_token()
self.send_command(PositionCommand(stream_name, current_token))
self.send_command(PositionCommand(stream_name, "master", current_token))
async def on_USER_SYNC(self, cmd: UserSyncCommand):
user_sync_counter.inc()
@ -326,7 +326,7 @@ class ReplicationClientHandler:
We need to check if the client is interested in the stream or not
"""
self.send_command(RdataCommand(stream_name, token, data))
self.send_command(RdataCommand(stream_name, "master", token, data))
class ReplicationDataHandler:

View file

@ -122,7 +122,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
We need to check if the client is interested in the stream or not
"""
self.send_command(RdataCommand(stream_name, token, data))
self.send_command(RdataCommand(stream_name, "master", token, data))
class RedisFactory(txredisapi.SubscriberFactory):