mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-18 08:54:54 +03:00
Make workers work on Py3 (#4027)
This commit is contained in:
parent
8ddd0f273c
commit
381d2cfdf0
13 changed files with 64 additions and 62 deletions
1
changelog.d/4027.bugfix
Normal file
1
changelog.d/4027.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Workers now start on Python 3.
|
|
@ -17,6 +17,7 @@ import gc
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
import psutil
|
||||||
from daemonize import Daemonize
|
from daemonize import Daemonize
|
||||||
|
|
||||||
from twisted.internet import error, reactor
|
from twisted.internet import error, reactor
|
||||||
|
@ -24,12 +25,6 @@ from twisted.internet import error, reactor
|
||||||
from synapse.util import PreserveLoggingContext
|
from synapse.util import PreserveLoggingContext
|
||||||
from synapse.util.rlimit import change_resource_limit
|
from synapse.util.rlimit import change_resource_limit
|
||||||
|
|
||||||
try:
|
|
||||||
import affinity
|
|
||||||
except Exception:
|
|
||||||
affinity = None
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -89,15 +84,20 @@ def start_reactor(
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
logger.info("Running")
|
logger.info("Running")
|
||||||
if cpu_affinity is not None:
|
if cpu_affinity is not None:
|
||||||
if not affinity:
|
# Turn the bitmask into bits, reverse it so we go from 0 up
|
||||||
quit_with_error(
|
mask_to_bits = bin(cpu_affinity)[2:][::-1]
|
||||||
"Missing package 'affinity' required for cpu_affinity\n"
|
|
||||||
"option\n\n"
|
cpus = []
|
||||||
"Install by running:\n\n"
|
cpu_num = 0
|
||||||
" pip install affinity\n\n"
|
|
||||||
)
|
for i in mask_to_bits:
|
||||||
logger.info("Setting CPU affinity to %s" % cpu_affinity)
|
if i == "1":
|
||||||
affinity.set_process_affinity_mask(0, cpu_affinity)
|
cpus.append(cpu_num)
|
||||||
|
cpu_num += 1
|
||||||
|
|
||||||
|
p = psutil.Process()
|
||||||
|
p.cpu_affinity(cpus)
|
||||||
|
|
||||||
change_resource_limit(soft_file_limit)
|
change_resource_limit(soft_file_limit)
|
||||||
if gc_thresholds:
|
if gc_thresholds:
|
||||||
gc.set_threshold(*gc_thresholds)
|
gc.set_threshold(*gc_thresholds)
|
||||||
|
|
|
@ -178,6 +178,9 @@ def start(config_options):
|
||||||
|
|
||||||
setup_logging(config, use_worker_options=True)
|
setup_logging(config, use_worker_options=True)
|
||||||
|
|
||||||
|
# This should only be done on the user directory worker or the master
|
||||||
|
config.update_user_directory = False
|
||||||
|
|
||||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||||
|
|
||||||
database_engine = create_engine(config.database_config)
|
database_engine = create_engine(config.database_config)
|
||||||
|
|
|
@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.metrics import RegistryProxy
|
from synapse.metrics import RegistryProxy
|
||||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||||
|
from synapse.replication.slave.storage._base import __func__
|
||||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||||
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
||||||
|
@ -49,31 +50,31 @@ class PusherSlaveStore(
|
||||||
SlavedAccountDataStore
|
SlavedAccountDataStore
|
||||||
):
|
):
|
||||||
update_pusher_last_stream_ordering_and_success = (
|
update_pusher_last_stream_ordering_and_success = (
|
||||||
DataStore.update_pusher_last_stream_ordering_and_success.__func__
|
__func__(DataStore.update_pusher_last_stream_ordering_and_success)
|
||||||
)
|
)
|
||||||
|
|
||||||
update_pusher_failing_since = (
|
update_pusher_failing_since = (
|
||||||
DataStore.update_pusher_failing_since.__func__
|
__func__(DataStore.update_pusher_failing_since)
|
||||||
)
|
)
|
||||||
|
|
||||||
update_pusher_last_stream_ordering = (
|
update_pusher_last_stream_ordering = (
|
||||||
DataStore.update_pusher_last_stream_ordering.__func__
|
__func__(DataStore.update_pusher_last_stream_ordering)
|
||||||
)
|
)
|
||||||
|
|
||||||
get_throttle_params_by_room = (
|
get_throttle_params_by_room = (
|
||||||
DataStore.get_throttle_params_by_room.__func__
|
__func__(DataStore.get_throttle_params_by_room)
|
||||||
)
|
)
|
||||||
|
|
||||||
set_throttle_params = (
|
set_throttle_params = (
|
||||||
DataStore.set_throttle_params.__func__
|
__func__(DataStore.set_throttle_params)
|
||||||
)
|
)
|
||||||
|
|
||||||
get_time_of_last_push_action_before = (
|
get_time_of_last_push_action_before = (
|
||||||
DataStore.get_time_of_last_push_action_before.__func__
|
__func__(DataStore.get_time_of_last_push_action_before)
|
||||||
)
|
)
|
||||||
|
|
||||||
get_profile_displayname = (
|
get_profile_displayname = (
|
||||||
DataStore.get_profile_displayname.__func__
|
__func__(DataStore.get_profile_displayname)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ from synapse.http.server import JsonResource
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.metrics import RegistryProxy
|
from synapse.metrics import RegistryProxy
|
||||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
|
||||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||||
|
@ -147,7 +147,7 @@ class SynchrotronPresence(object):
|
||||||
and haven't come back yet. If there are poke the master about them.
|
and haven't come back yet. If there are poke the master about them.
|
||||||
"""
|
"""
|
||||||
now = self.clock.time_msec()
|
now = self.clock.time_msec()
|
||||||
for user_id, last_sync_ms in self.users_going_offline.items():
|
for user_id, last_sync_ms in list(self.users_going_offline.items()):
|
||||||
if now - last_sync_ms > 10 * 1000:
|
if now - last_sync_ms > 10 * 1000:
|
||||||
self.users_going_offline.pop(user_id, None)
|
self.users_going_offline.pop(user_id, None)
|
||||||
self.send_user_sync(user_id, False, last_sync_ms)
|
self.send_user_sync(user_id, False, last_sync_ms)
|
||||||
|
@ -156,9 +156,9 @@ class SynchrotronPresence(object):
|
||||||
# TODO Hows this supposed to work?
|
# TODO Hows this supposed to work?
|
||||||
pass
|
pass
|
||||||
|
|
||||||
get_states = PresenceHandler.get_states.__func__
|
get_states = __func__(PresenceHandler.get_states)
|
||||||
get_state = PresenceHandler.get_state.__func__
|
get_state = __func__(PresenceHandler.get_state)
|
||||||
current_state_for_users = PresenceHandler.current_state_for_users.__func__
|
current_state_for_users = __func__(PresenceHandler.current_state_for_users)
|
||||||
|
|
||||||
def user_syncing(self, user_id, affect_presence):
|
def user_syncing(self, user_id, affect_presence):
|
||||||
if affect_presence:
|
if affect_presence:
|
||||||
|
@ -208,7 +208,7 @@ class SynchrotronPresence(object):
|
||||||
) for row in rows]
|
) for row in rows]
|
||||||
|
|
||||||
for state in states:
|
for state in states:
|
||||||
self.user_to_current_state[row.user_id] = state
|
self.user_to_current_state[state.user_id] = state
|
||||||
|
|
||||||
stream_id = token
|
stream_id = token
|
||||||
yield self.notify_from_replication(states, stream_id)
|
yield self.notify_from_replication(states, stream_id)
|
||||||
|
|
|
@ -82,9 +82,6 @@ CONDITIONAL_REQUIREMENTS = {
|
||||||
"psutil": {
|
"psutil": {
|
||||||
"psutil>=2.0.0": ["psutil>=2.0.0"],
|
"psutil>=2.0.0": ["psutil>=2.0.0"],
|
||||||
},
|
},
|
||||||
"affinity": {
|
|
||||||
"affinity": ["affinity"],
|
|
||||||
},
|
|
||||||
"postgres": {
|
"postgres": {
|
||||||
"psycopg2>=2.6": ["psycopg2"]
|
"psycopg2>=2.6": ["psycopg2"]
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.engines import PostgresEngine
|
from synapse.storage.engines import PostgresEngine
|
||||||
|
|
||||||
|
@ -23,6 +25,13 @@ from ._slaved_id_tracker import SlavedIdTracker
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def __func__(inp):
|
||||||
|
if six.PY3:
|
||||||
|
return inp
|
||||||
|
else:
|
||||||
|
return inp.__func__
|
||||||
|
|
||||||
|
|
||||||
class BaseSlavedStore(SQLBaseStore):
|
class BaseSlavedStore(SQLBaseStore):
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
super(BaseSlavedStore, self).__init__(db_conn, hs)
|
super(BaseSlavedStore, self).__init__(db_conn, hs)
|
||||||
|
|
|
@ -17,7 +17,7 @@ from synapse.storage import DataStore
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore, __func__
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
from ._slaved_id_tracker import SlavedIdTracker
|
||||||
|
|
||||||
|
|
||||||
|
@ -43,11 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
|
||||||
expiry_ms=30 * 60 * 1000,
|
expiry_ms=30 * 60 * 1000,
|
||||||
)
|
)
|
||||||
|
|
||||||
get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
|
get_to_device_stream_token = __func__(DataStore.get_to_device_stream_token)
|
||||||
get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
|
get_new_messages_for_device = __func__(DataStore.get_new_messages_for_device)
|
||||||
get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
|
get_new_device_msgs_for_remote = __func__(DataStore.get_new_device_msgs_for_remote)
|
||||||
delete_messages_for_device = DataStore.delete_messages_for_device.__func__
|
delete_messages_for_device = __func__(DataStore.delete_messages_for_device)
|
||||||
delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__
|
delete_device_msgs_for_remote = __func__(DataStore.delete_device_msgs_for_remote)
|
||||||
|
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
result = super(SlavedDeviceInboxStore, self).stream_positions()
|
result = super(SlavedDeviceInboxStore, self).stream_positions()
|
||||||
|
|
|
@ -13,23 +13,14 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import six
|
|
||||||
|
|
||||||
from synapse.storage import DataStore
|
from synapse.storage import DataStore
|
||||||
from synapse.storage.end_to_end_keys import EndToEndKeyStore
|
from synapse.storage.end_to_end_keys import EndToEndKeyStore
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore, __func__
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
from ._slaved_id_tracker import SlavedIdTracker
|
||||||
|
|
||||||
|
|
||||||
def __func__(inp):
|
|
||||||
if six.PY3:
|
|
||||||
return inp
|
|
||||||
else:
|
|
||||||
return inp.__func__
|
|
||||||
|
|
||||||
|
|
||||||
class SlavedDeviceStore(BaseSlavedStore):
|
class SlavedDeviceStore(BaseSlavedStore):
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
super(SlavedDeviceStore, self).__init__(db_conn, hs)
|
super(SlavedDeviceStore, self).__init__(db_conn, hs)
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
from synapse.storage import DataStore
|
from synapse.storage import DataStore
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore, __func__
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
from ._slaved_id_tracker import SlavedIdTracker
|
||||||
|
|
||||||
|
|
||||||
|
@ -33,9 +33,9 @@ class SlavedGroupServerStore(BaseSlavedStore):
|
||||||
"_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
|
"_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
|
||||||
)
|
)
|
||||||
|
|
||||||
get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__
|
get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
|
||||||
get_group_stream_token = DataStore.get_group_stream_token.__func__
|
get_group_stream_token = __func__(DataStore.get_group_stream_token)
|
||||||
get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__
|
get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user)
|
||||||
|
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
result = super(SlavedGroupServerStore, self).stream_positions()
|
result = super(SlavedGroupServerStore, self).stream_positions()
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
from synapse.storage import DataStore
|
from synapse.storage import DataStore
|
||||||
from synapse.storage.keys import KeyStore
|
from synapse.storage.keys import KeyStore
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore, __func__
|
||||||
|
|
||||||
|
|
||||||
class SlavedKeyStore(BaseSlavedStore):
|
class SlavedKeyStore(BaseSlavedStore):
|
||||||
|
@ -24,11 +24,11 @@ class SlavedKeyStore(BaseSlavedStore):
|
||||||
"_get_server_verify_key"
|
"_get_server_verify_key"
|
||||||
]
|
]
|
||||||
|
|
||||||
get_server_verify_keys = DataStore.get_server_verify_keys.__func__
|
get_server_verify_keys = __func__(DataStore.get_server_verify_keys)
|
||||||
store_server_verify_key = DataStore.store_server_verify_key.__func__
|
store_server_verify_key = __func__(DataStore.store_server_verify_key)
|
||||||
|
|
||||||
get_server_certificate = DataStore.get_server_certificate.__func__
|
get_server_certificate = __func__(DataStore.get_server_certificate)
|
||||||
store_server_certificate = DataStore.store_server_certificate.__func__
|
store_server_certificate = __func__(DataStore.store_server_certificate)
|
||||||
|
|
||||||
get_server_keys_json = DataStore.get_server_keys_json.__func__
|
get_server_keys_json = __func__(DataStore.get_server_keys_json)
|
||||||
store_server_keys_json = DataStore.store_server_keys_json.__func__
|
store_server_keys_json = __func__(DataStore.store_server_keys_json)
|
||||||
|
|
|
@ -17,7 +17,7 @@ from synapse.storage import DataStore
|
||||||
from synapse.storage.presence import PresenceStore
|
from synapse.storage.presence import PresenceStore
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore, __func__
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
from ._slaved_id_tracker import SlavedIdTracker
|
||||||
|
|
||||||
|
|
||||||
|
@ -34,8 +34,8 @@ class SlavedPresenceStore(BaseSlavedStore):
|
||||||
"PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
|
"PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
|
||||||
)
|
)
|
||||||
|
|
||||||
_get_active_presence = DataStore._get_active_presence.__func__
|
_get_active_presence = __func__(DataStore._get_active_presence)
|
||||||
take_presence_startup_info = DataStore.take_presence_startup_info.__func__
|
take_presence_startup_info = __func__(DataStore.take_presence_startup_info)
|
||||||
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
|
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
|
||||||
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
|
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
|
||||||
|
|
||||||
|
|
2
synctl
2
synctl
|
@ -280,7 +280,7 @@ def main():
|
||||||
if worker.cache_factor:
|
if worker.cache_factor:
|
||||||
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
|
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
|
||||||
|
|
||||||
for cache_name, factor in worker.cache_factors.iteritems():
|
for cache_name, factor in iteritems(worker.cache_factors):
|
||||||
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
|
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
|
||||||
|
|
||||||
start_worker(worker.app, configfile, worker.configfile)
|
start_worker(worker.app, configfile, worker.configfile)
|
||||||
|
|
Loading…
Reference in a new issue