‎2025 Nov 20 7:33 AM - edited ‎2025 Nov 20 2:50 PM
Hi,
I implemented a python script which connects to event mesh and subscribes to a queue.
The oauth token is retrieved successfully and the connection is established.
However every time the client tries to subscribe to a queue, the connection is closed.
This is also the case when the client tries sending a keepalive message.
This is my code:
import paho.mqtt.client as mqtt
import requests
import time
# ---------- Configuration ----------
BROKER_URL = "xxxx" # Replace with your MQTT broker URL
BROKER_PORT = 443 # Typically 8883 for TLS
TOPIC = "testOrgName/testEventMesh/1/test_queue"
# OAuth Configuration
OAUTH_TOKEN_URL = "xxx" # OAuth token endpoint
CLIENT_ID = "xxx"
CLIENT_SECRET = "xxx"
SCOPE = "mqtt.subscribe"
# -----------------------------------
def get_oauth_token():
"""Fetch a new OAuth access token"""
payload = {
'grant_type': 'client_credentials',
}
response = requests.post(OAUTH_TOKEN_URL, auth=(CLIENT_ID, CLIENT_SECRET), params=payload)
response.raise_for_status()
token = response.json().get("access_token")
return token
# MQTT Callbacks
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected successfully!")
#client.subscribe(TOPIC)
#client.publish(TOPIC, "Hello, MQTT!", qos=1)
print(f"Subscribed to topic: {TOPIC}")
else:
print(f"Connection failed with code {rc}")
def on_message(client, userdata, msg):
print(f"Received message: {msg.payload.decode()} on topic {msg.topic}")
def on_disconnect(client, userdata, rc):
print(f"lost connection with reason {rc}")
def on_log(client, userdata, level, buf):
print(f"{level}: {buf}")
def main():
# Get OAuth token
token = get_oauth_token()
print("Obtained OAuth token")
# Create MQTT client
client = mqtt.Client(transport="websockets", client_id="xyz")
# Assign callbacks
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.on_log = on_log
# maybe not even necessary
client.tls_set(ca_certs="./_.cf.eu10.hana.ondemand.com")
client.ws_set_options(path="/protocols/mqtt311ws", headers={"Authorization": f"Bearer {token}"})
# Connect to broker
client.connect(BROKER_URL, BROKER_PORT)
# Start network loop
client.loop_start()
try:
while True:
# Keep running or refresh token periodically if needed
time.sleep(10)
except KeyboardInterrupt:
print("Disconnecting...")
client.loop_stop()
client.disconnect()
if __name__ == "__main__":
main()I have tried setting a different client id and I have verified that the topic coincides with the queue name and the queue naming rules in the service descriptor. I have also verified, that enough connections are open (there is no other connection). I also have verified, that the client is indeed connected to event mesh using the resource usage where it is, for some reason, marked as a producer.
My suspicion now is, that for some reason i have to enable the mqtt functionality or at least give some permissions for mqtt or something.
what would also be helpful would be something where i can see the logs of the mqtt broker in event mesh or at least any kind of logs.
-- EDIT --
I have found out, that for some reason the standalone event mesh and the integration suite event mesh behave differently. Using exactly the same script, subscribing to a topic does not disconnect the client and messages can be received, as long as messages are solely published to a topic in event mesh integration suite. I have no knowledge about subscribing to a queue.