Integration Forum
cancel
Showing results for 
Search instead for 
Did you mean: 
Read only

Using an mqtt client with event mesh (python)

schmeli
Discoverer
0 Likes
557

Hi,
I implemented a python script which connects to event mesh and subscribes to a queue.
The oauth token is retrieved successfully and the connection is established.
However every time the client tries to subscribe to a queue, the connection is closed.
This is also the case when the client tries sending a keepalive message.
This is my code:

import paho.mqtt.client as mqtt
import requests
import time

# ---------- Configuration ----------
BROKER_URL = "xxxx"      # Replace with your MQTT broker URL
BROKER_PORT = 443                     # Typically 8883 for TLS
TOPIC = "testOrgName/testEventMesh/1/test_queue"

# OAuth Configuration
OAUTH_TOKEN_URL = "xxx"  # OAuth token endpoint
CLIENT_ID = "xxx"
CLIENT_SECRET = "xxx"
SCOPE = "mqtt.subscribe"

# -----------------------------------

def get_oauth_token():
    """Fetch a new OAuth access token"""
    payload = {
        'grant_type': 'client_credentials',
    }
    response = requests.post(OAUTH_TOKEN_URL, auth=(CLIENT_ID, CLIENT_SECRET), params=payload)
    response.raise_for_status()
    token = response.json().get("access_token")
    return token

# MQTT Callbacks
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected successfully!")
        #client.subscribe(TOPIC)
        #client.publish(TOPIC, "Hello, MQTT!", qos=1)
        print(f"Subscribed to topic: {TOPIC}")
    else:
        print(f"Connection failed with code {rc}")

def on_message(client, userdata, msg):
    print(f"Received message: {msg.payload.decode()} on topic {msg.topic}")

def on_disconnect(client, userdata, rc):
    print(f"lost connection with reason {rc}")

def on_log(client, userdata, level, buf):
    print(f"{level}: {buf}")

def main():
    # Get OAuth token
    token = get_oauth_token()
    print("Obtained OAuth token")

    # Create MQTT client
    client = mqtt.Client(transport="websockets", client_id="xyz")

    # Assign callbacks
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_log = on_log

    # maybe not even necessary
    client.tls_set(ca_certs="./_.cf.eu10.hana.ondemand.com")
    client.ws_set_options(path="/protocols/mqtt311ws", headers={"Authorization": f"Bearer {token}"})

    # Connect to broker
    client.connect(BROKER_URL, BROKER_PORT)

    # Start network loop
    client.loop_start()

    try:
        while True:
            # Keep running or refresh token periodically if needed
            time.sleep(10)
    except KeyboardInterrupt:
        print("Disconnecting...")
        client.loop_stop()
        client.disconnect()

if __name__ == "__main__":
    main()

 I have tried setting a different client id and I have verified that the topic coincides with the queue name and the queue naming rules in the service descriptor. I have also verified, that enough connections are open (there is no other connection). I also have verified, that the client is indeed connected to event mesh using the resource usage where it is, for some reason, marked as a producer.

My suspicion now is, that for some reason i have to enable the mqtt functionality or at least give some permissions for mqtt or something.

what would also be helpful would be something where i can see the logs of the mqtt broker in event mesh or at least any kind of logs.

 

-- EDIT --

I have found out, that for some reason the standalone event mesh and the integration suite event mesh behave differently. Using exactly the same script, subscribing to a topic does not disconnect the client and messages can be received, as long as messages are solely published to a topic in event mesh integration suite. I have no knowledge about subscribing to a queue.

0 REPLIES 0