• Multithreading? How?

    From pozz@21:1/5 to All on Fri Apr 28 12:31:36 2023
    I need to develop a Python application that is a sort of gateway between
    to networks: a "serial bus" network (think of a serial port or a USB connection) and a MQTT connection (in my case, it's AWS IoT service).

    On the bus, a proprietary protocol is implemented. From the bus, the app
    knows the status of the system (think of a simple ON/OFF status).
    The user can retrieve the status of the system through MQTT: it sends a
    message to read/status MQTT topic and receive back a message with the
    current status on read/status/reply MQTT topic. Of course, they are just examples.

    On the contrary, when the app detects a status change reported from the
    serial bus (think of a transition from ON to OFF), it sends a MQTT message.

    I'm thinking to split the application in three classes: Bus, App and
    IoT. Bus and IoT are also threads.
    The thread of Bus manages the serial protocol, while the thread of IoT
    manages MQTT connection with the broker (the server).

    However I don't know if it could be a good architecture. Suppone Bus
    thread receives a new status from the system. In the context of
    ThreadBus, the object Bus could call a method of App object:

    app.set_status(new_status_from_the_bus)

    In the App I have:

    class App():
    ..
    def set_status(new_status): # Could be called from ThreadBus
    if new_status != self.new_status:
    self.new_status = new_status
    # Do some actions on status change

    def get_status(): # Could be called from ThreadIoT
    return self.status

    Of course, IoT object needs to know the current status of the system
    when a message is received from MQTT. So ThreadIoT could call
    app.get_status().

    I think this architecture has some problems with race conditions or
    threads synchronization. What happens if IoT calls get_status() exactly
    when set_status() called by ThreadBus is executing? If status is a big
    data structure, set_status() could be interrupted by get_status() that
    could get a completely corrupted status, because it was only partly
    updated by set_status().

    I know I can use locks or semaphores in get_status() and set_status(),
    but I don't know if this is a good approach. Consider that the system is complex, it isn't composed by a simple single status. It has many many parameters that are collected from the serial bus. Should I use a lock
    for every [get|set]_status(), [get|set]_dimensions(), [get|set]_battery_level(), [get|set]_mains_present(), and so on?


    Another possibility is to use a Queue for Bus and a Queue for IoT. So
    the set_status(new_status) called from Bus object will be transformed in
    a put in the queue:

    app_queue.put({"type": "new_status", "data": ...})

    However how could be transformed the get_status() from IoT? How the
    return value (the current status) is real retrieved?

    class IoT():
    ..
    def status_request_from_MQTT():
    app_queue.put({"type": "get_status"})
    # How to get the status to return?
    return current_status

    Should the app put the status on the same queue and should IoT waits for
    a new message in the Queue?

    def status_request_from_MQTT():
    app_queue.put({"type": "get_status"})
    try:
    current_status = app_queue.get(timeout=10)
    except Empty:
    # What to do?
    return current_status


    Again another approach is to avoid multi-threading at all and create a
    single "main loop" function that waits at the same time for incoming
    events on the serial bus and MQTT (how?). I don't know if this could be
    done in my case, because I'm using awscrt Python module and it works
    through callbacks that I think is called from another thread.


    Any suggestions on this architecture?

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)
  • From risky sibam@21:1/5 to All on Fri Apr 28 05:43:09 2023
    Pada Jumat, 28 April 2023 pukul 17.31.58 UTC+7, pozz menulis:
    I need to develop a Python application that is a sort of gateway between
    to networks: a "serial bus" network (think of a serial port or a USB connection) and a MQTT connection (in my case, it's AWS IoT service).

    On the bus, a proprietary protocol is implemented. From the bus, the app knows the status of the system (think of a simple ON/OFF status).
    The user can retrieve the status of the system through MQTT: it sends a message to read/status MQTT topic and receive back a message with the
    current status on read/status/reply MQTT topic. Of course, they are just examples.

    On the contrary, when the app detects a status change reported from the serial bus (think of a transition from ON to OFF), it sends a MQTT message.

    I'm thinking to split the application in three classes: Bus, App and
    IoT. Bus and IoT are also threads.
    The thread of Bus manages the serial protocol, while the thread of IoT manages MQTT connection with the broker (the server).

    However I don't know if it could be a good architecture. Suppone Bus
    thread receives a new status from the system. In the context of
    ThreadBus, the object Bus could call a method of App object:

    app.set_status(new_status_from_the_bus)

    In the App I have:

    class App():
    ..
    def set_status(new_status): # Could be called from ThreadBus
    if new_status != self.new_status:
    self.new_status = new_status
    # Do some actions on status change

    def get_status(): # Could be called from ThreadIoT
    return self.status

    Of course, IoT object needs to know the current status of the system
    when a message is received from MQTT. So ThreadIoT could call app.get_status().

    I think this architecture has some problems with race conditions or
    threads synchronization. What happens if IoT calls get_status() exactly
    when set_status() called by ThreadBus is executing? If status is a big
    data structure, set_status() could be interrupted by get_status() that
    could get a completely corrupted status, because it was only partly
    updated by set_status().

    I know I can use locks or semaphores in get_status() and set_status(),
    but I don't know if this is a good approach. Consider that the system is complex, it isn't composed by a simple single status. It has many many parameters that are collected from the serial bus. Should I use a lock
    for every [get|set]_status(), [get|set]_dimensions(), [get|set]_battery_level(), [get|set]_mains_present(), and so on?


    Another possibility is to use a Queue for Bus and a Queue for IoT. So
    the set_status(new_status) called from Bus object will be transformed in
    a put in the queue:

    app_queue.put({"type": "new_status", "data": ...})

    However how could be transformed the get_status() from IoT? How the
    return value (the current status) is real retrieved?

    class IoT():
    ..
    def status_request_from_MQTT():
    app_queue.put({"type": "get_status"})
    # How to get the status to return?
    return current_status

    Should the app put the status on the same queue and should IoT waits for
    a new message in the Queue?

    def status_request_from_MQTT():
    app_queue.put({"type": "get_status"})
    try:
    current_status = app_queue.get(timeout=10)
    except Empty:
    # What to do?
    return current_status


    Again another approach is to avoid multi-threading at all and create a
    single "main loop" function that waits at the same time for incoming
    events on the serial bus and MQTT (how?). I don't know if this could be
    done in my case, because I'm using awscrt Python module and it works
    through callbacks that I think is called from another thread.


    Any suggestions on this architecture?
    https://sugihwaraskecngancar.kedirikab.go.id/desa/upload/media/momo99.html

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)
  • From Diego Souza@21:1/5 to pozzugno@gmail.com on Sat May 13 00:00:14 2023
    Hi there,

    I hope this e-mail is still on time for you. I have implemented this architecture a few times, and they all work fine nowadays. However, your question made me review it and create a small gist.

    I suggest you create a thread for every output and input connection. This
    makes it easier to focus on reading or writing inside a given object. For example, in your project, I would separate it into IotReader, IotWriter, MqttReader, and MqttWriter. Another thing I do to avoid manipulating
    multiple locks, semaphores, and so on is to create a central event loop.
    For every event that comes from IotReader or MqttReader, I would pack it
    into an event and send it to a central thread. This is the main gateway
    thread, and I would call it Gateway.

    I don't know if you have ever programmed in Android. But the Android
    framework uses a similar approach to processing data. Whenever you need to process something, you start a new 'Thread', and when you need to present
    the result in the interface you dispatch events until the main thread is notified and updates the corresponding Views. The point here is: never do
    any extensive processing in the main thread as it is going delay. You will probably not do it now, but if you ever need it, make a pool of workers to process this and keep the Gateway free. Replace the Threads with multiprocessing.Process, as well, as Python lacks true multithreading.

    Regarding thread/process communication, I like to implement this using
    Queues. The Gateway class would have a main_queue to receive events from IotReader and MqttReader. IotWriter and MqttWriter have a particular queue
    as well. Whenever the Gateway needs to send something to either of them, it just needs to reference their respective queues, which I wrap inside a
    method, for simplicity.

    Another benefit of this architecture is the ability to scale to more connections easily. In the past, I have used this strategy to schedule
    tasks for up to about 20 devices (each with an input and output thread). I believe it could go higher, but I haven't needed to. There are fully distributed architectures more suitable for hundreds and thousands of connections, but this is likely not what you need now.

    The following is a possible implementation for the IotReader. You need to replace the AnySerialReader class and its read method with the
    initialization of your own Bus wrapper. The read method must have a timeout parameter if you want to cancel the operation properly. The terminate
    method is used to terminate the program properly.


































    *class IotReader(Thread): def __init__(self, queue_master, name='IotReader'): super().__init__()
    self.queue_master = queue_master self.queue = Queue()
    self.done = False self.name <http://self.name> = name
    self.start() def terminate(self): self.done = True
    def run(self): log.info <http://log.info>(f"Starting thread for {self.name <http://self.name>}") serial_reader = AnySerialReader('Serial' + self.name <http://self.name>) log.info <http://log.info>(f"Serial reader for {self.name <http://self.name>} initialized") while not self.done: try:
    data = serial_reader.read(timeout=1)
    if data is None: continue self.queue_master.put(('on_iot_event', data)) except:
    traceback.print_exc(file=sys.stdout)
    log.warning("Terminating IotReader") serial_reader.terminate()*


    The following is a possible implementation for IotWriter. It adds a method named send that adds new tasks to the queue. The main loop, running inside
    the thread, waits for these events and calls write in AnySerialWriter. This
    may be a slow operation, the connection may be down, and we need to
    reconnect, etc. This is why we need a thread for the output message as well.









































    *class IotWriter(Thread): def __init__(self, name='IotWriter'): super().__init__() self.queue = Queue() self.done
    = False self.name <http://self.name> = name
    self.start() def terminate(self): self.done = True self.queue.put( ('terminate', None) ) def send(self, data): self.queue.put( ('write_message', data) ) def run(self):
    log.info <http://log.info>(f"Starting thread for {self.name <http://self.name>}") serial_writer = AnySerialWriter('Serial' + self.name <http://self.name>) log.info <http://log.info>(f"Serial
    writer for {self.name <http://self.name>} initialized")
    while not self.done: try: action, data = self.queue.get() if action == 'terminate':
    break elif action == 'write_message': serial_writer.write(data)
    else: log.error(f'Unknown action for IotWriter - action={action}, data={data}') except:
    traceback.print_exc(file=sys.stdout)
    log.warning("Terminating IotWriter") serial_writer.terminate()*


    I do not include the source for a MqttReceiver and MqttWriter as they are
    very similar, in structure, to IotWriter and MqttWriter. The code below is
    for the Gateway class. It initializes all readers and writers. Following
    that, it will wait for input messages and process them adequately. You
    could add as many events as you need. I used two to illustrate.





































































    *class Gateway(Thread): def __init__(self): super().__init__()
    self.queue_master = Queue() self.done = False
    self.start() def run(self): log.info <http://log.info>("Starting Gateway") while not self.done:
    try: self.iot_reader =
    IotReader(self.queue_master) self.iot_writer = IotWriter()
    self.mqtt_reader =
    MqttReader(self.queue_master) self.mqtt_writer =
    MqttWriter() log.info <http://log.info>(f"Starting {self.__class__.__name__}") while not
    self.done: try: action, data = self.queue_master.get() if action in 'on_mqtt_event':
    self.on_mqtt_event(data)
    elif action == 'on_iot_event':
    self.on_iot_event(data) elif action == 'terminate': break
    else:
    log.error(f'Unknown action, action={action}, data={data}')
    except: log.error("Error during message parsing")
    traceback.print_exc(file=sys.stdout) except:
    log.error("Error during gateway configuration") traceback.print_exc(file=sys.stdout)
    self.iot_reader.terminate() self.iot_writer.terminate() self.mqtt_reader.terminate() self.mqtt_writer.terminate()
    self.iot_reader.join() self.iot_writer.join() self.mqtt_reader.join() self.mqtt_writer.join()
    log.warning('Terminating Gateway') def terminate(self):
    self.done = True self.queue_master.put(('terminate', None))
    def on_iot_event(self, data): log.info <http://log.info>(f'Event
    from iot device, forwarding to mqtt, data={data}')
    self.mqtt_writer.send(data) def on_mqtt_event(self, data):
    log.info <http://log.info>(f'Event from iot device, forwarding to iot, data={data}') self.iot_writer.send(data)*


    I started the Gateway using the code below. It calls the method terminate
    when I press Ctrl+C. This event could also come from the MQTT server or anywhere else.




















    *gateway = Gateway()# Your main thread is free here. You could start a webserver and display # a dashboard. Or wait, like below.try: gateway.join()except KeyboardInterrupt: log.info
    <http://log.info>("Sending terminate command...")
    gateway.terminate()try: gateway.join()except KeyboardInterrupt:
    log.info <http://log.info>("Killing the app...") sys.exit(0)
    passlog.info <http://log.info>("Bye!")*


    If you want to check the full code, a small gist in the link below: *https://gist.github.com/diegofps/87945a0c3e800c747f3af07833ff6b7e <https://gist.github.com/diegofps/87945a0c3e800c747f3af07833ff6b7e>*


    You also mentioned discovering the device status and sending it back
    through MQTT. I can see two approaches to this. The first approach is to
    cache the status emitted from the device. This is fine if the data is small enough to keep in the gateway memory. Then, I would send it back through
    MQTT immediately. The second approach is to forward the request to the
    device. The device will later respond to your query with the original
    question and response. You likely need the question as you need to remember what you need to do with it now. It is stateless. You could also mix these
    two approaches and cache the state for a certain amount of time. After
    that, it would expire, and you would ask the device again.

    This is an overview of how I implement it nowadays. I am sure other people
    may have different strategies and ideas to improve it.

    Best,
    Diego




    On Fri, Apr 28, 2023 at 1:10 PM pozz <pozzugno@gmail.com> wrote:

    I need to develop a Python application that is a sort of gateway between
    to networks: a "serial bus" network (think of a serial port or a USB connection) and a MQTT connection (in my case, it's AWS IoT service).

    On the bus, a proprietary protocol is implemented. From the bus, the app knows the status of the system (think of a simple ON/OFF status).
    The user can retrieve the status of the system through MQTT: it sends a message to read/status MQTT topic and receive back a message with the
    current status on read/status/reply MQTT topic. Of course, they are just examples.

    On the contrary, when the app detects a status change reported from the serial bus (think of a transition from ON to OFF), it sends a MQTT message.

    I'm thinking to split the application in three classes: Bus, App and
    IoT. Bus and IoT are also threads.
    The thread of Bus manages the serial protocol, while the thread of IoT manages MQTT connection with the broker (the server).

    However I don't know if it could be a good architecture. Suppone Bus
    thread receives a new status from the system. In the context of
    ThreadBus, the object Bus could call a method of App object:

    app.set_status(new_status_from_the_bus)

    In the App I have:

    class App():
    ..
    def set_status(new_status): # Could be called from ThreadBus
    if new_status != self.new_status:
    self.new_status = new_status
    # Do some actions on status change

    def get_status(): # Could be called from ThreadIoT
    return self.status

    Of course, IoT object needs to know the current status of the system
    when a message is received from MQTT. So ThreadIoT could call app.get_status().

    I think this architecture has some problems with race conditions or
    threads synchronization. What happens if IoT calls get_status() exactly
    when set_status() called by ThreadBus is executing? If status is a big
    data structure, set_status() could be interrupted by get_status() that
    could get a completely corrupted status, because it was only partly
    updated by set_status().

    I know I can use locks or semaphores in get_status() and set_status(),
    but I don't know if this is a good approach. Consider that the system is complex, it isn't composed by a simple single status. It has many many parameters that are collected from the serial bus. Should I use a lock
    for every [get|set]_status(), [get|set]_dimensions(), [get|set]_battery_level(), [get|set]_mains_present(), and so on?


    Another possibility is to use a Queue for Bus and a Queue for IoT. So
    the set_status(new_status) called from Bus object will be transformed in
    a put in the queue:

    app_queue.put({"type": "new_status", "data": ...})

    However how could be transformed the get_status() from IoT? How the
    return value (the current status) is real retrieved?

    class IoT():
    ..
    def status_request_from_MQTT():
    app_queue.put({"type": "get_status"})
    # How to get the status to return?
    return current_status

    Should the app put the status on the same queue and should IoT waits for
    a new message in the Queue?

    def status_request_from_MQTT():
    app_queue.put({"type": "get_status"})
    try:
    current_status = app_queue.get(timeout=10)
    except Empty:
    # What to do?
    return current_status


    Again another approach is to avoid multi-threading at all and create a
    single "main loop" function that waits at the same time for incoming
    events on the serial bus and MQTT (how?). I don't know if this could be
    done in my case, because I'm using awscrt Python module and it works
    through callbacks that I think is called from another thread.


    Any suggestions on this architecture?
    --
    https://mail.python.org/mailman/listinfo/python-list



    --
    Diego Souza
    Wespa Intelligent Systems
    Rio de Janeiro - Brasil

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)