I am trying to fetch data from Cassandra from a specific table and trying to insert it into another table in Cassandra after making some changes. Both the tables are located in keyspace "test". When I am trying to get the data from the first table everything works fine and it is able to fetch the data. However, in the future handler which handles the output of the first query, I am trying to insert the data into another table under the same Cassandra instance and it is gettingting failed. I am getting an error from the application stating "cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])" . I am not sure where I am going wrong
import threading
from threading import Event
from cassandra.query import SimpleStatement
from cassandra.cluster import Cluster
hosts=['127.0.0.1']
keyspace="test"
thread_local = threading.local()
cluster_ = Cluster(hosts)
def get_session():
if hasattr(thread_local, "cassandra_session"):
print("got session from threadlocal")
return thread_local.cassandra_session
print(" Connecting to Cassandra Host " + str(hosts))
session_ = cluster_.connect(keyspace)
print(" Connecting and creating session to Cassandra KeySpace " + keyspace)
thread_local.cassandra_session = session_
return session_
class PagedResultHandler(object):
def __init__(self, future):
self.error = None
self.finished_event = Event()
self.future = future
self.future.add_callbacks(
callback=self.handle_page,
errback=self.handle_error)
def handle_page(self, rows):
for row in rows:
process_row(row)
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()
def handle_error(self, exc):
self.error = exc
self.finished_event.set()
def process_row(row):
print(row)
session_ = get_session()
stmt = session_.prepare(
"INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?)")
results = session_.execute(stmt,
[row.customer, row.snr, row.rttt,row.created_time])
print("Done")
session = get_session()
query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
handler.finished_event.wait()
if handler.error:
raise handler.error
cluster_.shutdown()
However, when I try to execute the python file the application is throwing an error "cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])" from getSession() call from "process_row" method. Clearly, the first call to Cassandra is getting succeeded without any issues. There is no connectivity issue and the Cassandra instance is running fine locally. I am able to query the data using cqlsh. If I call the process_row method outside the future handler everything is working fine, I am not sure what needs to be done to make it happen from the Future Handler.
Connecting to Cassandra Host ['127.0.0.1']
Connecting and creating session to Cassandra KeySpace test
Row(customer='abcd', snr=100, rttt=121, created_time=datetime.datetime(2020, 8, 8, 2, 26, 51))
Connecting to Cassandra Host ['127.0.0.1']
Traceback (most recent call last):
File "test/check.py", , in <module>
raise handler.error
File "cassandra/cluster.py", line 4579, in cassandra.cluster.ResponseFuture._set_result
File "cassandra/cluster.py", line 4777, in cassandra.cluster.ResponseFuture._set_final_result
File "test/check.py"", in handle_page
process_row(row)
File "test/check.py"", in process_row
session_ = get_session()
File "/test/check.py"", in get_session
session_ = cluster_.connect(keyspace)
File "cassandra/cluster.py", line 1715, in cassandra.cluster.Cluster.connect
File "cassandra/cluster.py", line 1772, in cassandra.cluster.Cluster._new_session
File "cassandra/cluster.py", line 2553, in cassandra.cluster.Session.__init__
cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])
Process finished with exit code 1
from Python3: cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1']) when using execute_async future
No comments:
Post a Comment