twitter-stream.py

class FilteredStream

  • POST, DELETE /2/tweets/search/stream/rules

    def add_rule(self, data: dict) -> json:
      return self.api(
        method="POST", endpoint="stream/rules", data=data
      ).json()
    
  • GET /2/tweets/search/stream/rules

    def get_rules(self) -> json:
      return self.api(
        method="GET", endpoint="stream/rules"
      ).json()
    
  • DELETE

    def delete_rule(self, data: dict) -> json:
      return self.api(
        method="POST", endpoint="stream/rules", data=data
      )
    
  • GET GET /2/tweets/search/stream

    def connect(self):
      try:
        response = self.api(
          "GET", endpoint="stream", stream=True
        )
        response.raise_for_status()
        for response_lines in response.iter_lines():
            if response_lines:
                yield json.loads(response_lines)
      except Exception as e:
          raise e