|
| 1 | +import logging |
| 2 | +import os |
| 3 | + |
| 4 | +import redis |
| 5 | + |
| 6 | +LOGGING_LEVEL = os.environ.get("LOGLEVEL", "INFO") |
| 7 | +logging.basicConfig( |
| 8 | + format="[%(asctime)s] %(levelname)-8s %(message)s", |
| 9 | + level=LOGGING_LEVEL, |
| 10 | +) |
| 11 | + |
| 12 | +REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379") |
| 13 | +SUB_KEY = "__keyspace@0__:tokens:*" |
| 14 | + |
| 15 | +# pub/sub clients do not allow the execution of non-pubsub commands |
| 16 | +# so, we're using a dedicated client for subscribing to keyspace events |
| 17 | +# and a dedicated client for non-pubsub commands, needed to handle the scoreboard logic |
| 18 | +sub_client = redis.Redis.from_url(REDIS_URL, decode_responses=True).pubsub() |
| 19 | +client = redis.Redis.from_url(REDIS_URL, decode_responses=True) |
| 20 | + |
| 21 | + |
| 22 | +def keyspace_event_handler(event): |
| 23 | + """Handles the keyspace events. |
| 24 | +
|
| 25 | + This is an example of the event format: |
| 26 | + { |
| 27 | + 'type': 'pmessage', |
| 28 | + 'pattern': '__keyspace@0__:tokens:*', |
| 29 | + 'channel': '__keyspace@0__:tokens:luca', |
| 30 | + 'data': 'sadd' |
| 31 | + } |
| 32 | + """ |
| 33 | + logging.debug(f"Received new event: {event}") |
| 34 | + event_type = event['data'] |
| 35 | + channel = event["channel"] |
| 36 | + logging.info(f"event >>> {event_type} on {channel}") |
| 37 | + affected_key = channel.split(":", maxsplit=1)[1] |
| 38 | + how_many = client.scard(affected_key) |
| 39 | + logging.info(f"Set cardinality {affected_key} is {how_many}") |
| 40 | + |
| 41 | + username = affected_key.split(":")[1] |
| 42 | + |
| 43 | + if how_many > 0: |
| 44 | + client.zadd("scoreboard", {username: how_many}) |
| 45 | + else: |
| 46 | + client.zrem("scoreboard", username) |
| 47 | + |
| 48 | + scoreboard = client.zrange("scoreboard", 0, -1, desc=True, withscores=True) |
| 49 | + logging.info(f"Scores: {scoreboard}") |
| 50 | + |
| 51 | + |
| 52 | +def main(): |
| 53 | + sub_client.psubscribe(**{SUB_KEY: keyspace_event_handler}) |
| 54 | + pubsub_worker_thread = None |
| 55 | + try: |
| 56 | + logging.debug("Waiting for events...") |
| 57 | + pubsub_worker_thread = sub_client.run_in_thread(sleep_time=.01) |
| 58 | + pubsub_worker_thread.join() |
| 59 | + except KeyboardInterrupt: |
| 60 | + pass |
| 61 | + finally: |
| 62 | + # gracefully stops the clients, for example when receives a `KeyboardInterrupt` (e.g. CTRL + c) |
| 63 | + logging.debug("Stopping the application...") |
| 64 | + if pubsub_worker_thread: |
| 65 | + pubsub_worker_thread.stop() |
| 66 | + client.close() |
| 67 | + |
| 68 | + |
| 69 | +if __name__ == "__main__": |
| 70 | + main() |
0 commit comments