C++中的Actor模型是一種并發編程模型,它通過將計算單元封裝為獨立、可并發執行的Actor實例,來實現并發和消息傳遞。每個Actor都有自己的狀態和行為,并且通過接收和發送消息來進行通信。Actor之間是完全隔離的,它們之間只能通過消息進行通信,從而避免了共享狀態和顯式鎖的問題。
下面是一個簡單的C++ Actor模型的示例實現:
#include <IOStream>
#include <queue>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>
class Actor {
public:
void run() {
while (!stop) {
std::function<void()> message;
{
std::unique_lock<std::mutex> lock(mutex);
condition.wAIt(lock, [this]() {
return stop || !messageQueue.empty();
});
if (stop && messageQueue.empty()) {
return;
}
message = std::move(messageQueue.front());
messageQueue.pop();
}
message();
}
}
template<typename F, typename... Args>
void send(F&& f, Args&&... args) {
{
std::lock_guard<std::mutex> lock(mutex);
messageQueue.emplace(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
}
condition.notify_one();
}
void stopActor() {
{
std::lock_guard<std::mutex> lock(mutex);
stop = true;
}
condition.notify_all();
}
private:
std::queue<std::function<void()>> messageQueue;
std::mutex mutex;
std::condition_variable condition;
bool stop = false;
};
// 示例Actor行為
class MyActor {
public:
void doSomething(int id) {
std::cout << "Actor ID: " << id << ", thread ID: " << std::this_thread::get_id() << std::endl;
// 具體行為邏輯
}
};
int main() {
MyActor actor;
Actor actingThread;
actingThread.send([&actor]() {
actor.doSomething(1);
});
actingThread.send([&actor]() {
actor.doSomething(2);
});
actingThread.send([&actingThread]() {
actingThread.stopActor();
});
std::thread thread([&actingThread]() {
actingThread.run();
});
thread.join();
return 0;
}
上述代碼實現了一個簡單的Actor模型。在Actor類中,使用條件變量和互斥鎖來實現Actor的同步和通信。每個Actor都維護一個消息隊列,通過send方法向隊列中添加消息。
在main函數中,示例代碼創建了一個MyActor對象并實例化了一個Actor對象。通過調用Actor對象的send方法,將消息發送給MyActor對象。MyActor對象中的doSomething方法會在調用線程的上下文中執行。
然后,通過調用stopActor方法停止Actor線程。最后,創建一個新線程運行Actor的run方法來處理消息隊列中的消息。
請注意,上述示例代碼只是一個簡單的Actor模型的實現,實際使用時可能需要考慮更多的細節,如Actor之間的連接、創建多個Actor對象、消息路由等。Actor模型可以作為實現并發和并行的一種有趣和強大的方式,它可以提供更好的抽象和隔離性來處理并發編程中的問題。
添加actor之間的連接功能
要實現Actor之間的連接功能,可以使用一個中央調度器來管理Actor之間的消息傳遞和通信。下面是一個示例的C++ Actor模型的實現,包括Actor之間的連接功能:
#include <iostream>
#include <queue>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
class Actor {
public:
void run() {
while (!stop) {
std::function<void()> message;
{
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, [this]() {
return stop || !messageQueue.empty();
});
if (stop && messageQueue.empty()) {
return;
}
message = std::move(messageQueue.front());
messageQueue.pop();
}
message();
}
}
template<typename F, typename... Args>
void send(F&& f, Args&&... args) {
{
std::lock_guard<std::mutex> lock(mutex);
messageQueue.emplace(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
}
condition.notify_one();
}
void stopActor() {
{
std::lock_guard<std::mutex> lock(mutex);
stop = true;
}
condition.notify_all();
}
private:
std::queue<std::function<void()>> messageQueue;
std::mutex mutex;
std::condition_variable condition;
bool stop = false;
};
class ActorSystem {
public:
ActorSystem() {
std::thread thread([this]() {
this->run();
});
thread.detach();
}
template<typename T>
Actor& createActor() {
std::unique_lock<std::mutex> lock(mutex);
Actor* actor = new Actor();
actors.emplace(actorId, actor);
actorId++;
return *actor;
}
void sendMessage(int actorId, std::function<void()> message) {
std::unique_lock<std::mutex> lock(mutex);
if (actors.find(actorId) != actors.end()) {
actors[actorId]->send(message);
}
}
void stopActor(int actorId) {
std::unique_lock<std::mutex> lock(mutex);
if (actors.find(actorId) != actors.end()) {
actors[actorId]->stopActor();
delete actors[actorId];
actors.erase(actorId);
}
}
private:
void run() {
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(mutex);
for (const auto& actor : actors) {
actor.second->run();
}
}
}
std::unordered_map<int, Actor*> actors;
int actorId = 0;
std::mutex mutex;
};
// 示例Actor行為
class MyActor {
public:
MyActor(ActorSystem& actorSystem) : actorSystem(actorSystem) {}
void doSomething(int id) {
std::cout << "Actor ID: " << id << ", thread ID: " << std::this_thread::get_id() << std::endl;
// 具體行為邏輯
// 發送消息給其他Actor
actorSystem.sendMessage(1, [this]() {
this->doSomethingElse();
});
}
void doSomethingElse() {
std::cout << "Doing something else" << std::endl;
// 具體行為邏輯
}
private:
ActorSystem& actorSystem;
};
int main() {
ActorSystem actorSystem;
MyActor actor1(actorSystem);
MyActor actor2(actorSystem);
Actor& actor1Ref = actorSystem.createActor<MyActor>();
Actor& actor2Ref = actorSystem.createActor<MyActor>();
actor1Ref.send([&actor1]() {
actor1.doSomething(1);
});
actor2Ref.send([&actor2]() {
actor2.doSomething(2);
});
std::this_thread::sleep_for(std::chrono::seconds(5));
actorSystem.stopActor(0);
actorSystem.stopActor(1);
return 0;
}
上述代碼中,我們引入了一個ActorSystem類來管理所有的Actor。ActorSystem類中維護了一個unordered_map來存儲Actor的ID和指針的映射關系。createActor方法用于創建新的Actor,并返回其引用。sendMessage方法用于向指定的Actor發送消息,stopActor方法用于停止指定的Actor。
在MyActor類中,我們添加了一個doSomethingElse方法,用于演示如何向其他Actor發送消息。在doSomething方法中,我們通過actorSystem.sendMessage方法向ID為1的Actor發送消息。
在main函數中,我們創建了兩個MyActor對象,并通過createActor方法創建了兩個對應的Actor。然后,我們通過調用send方法向兩個Actor發送消息。最后,我們通過調用stopActor方法停止兩個Actor。
請注意,上述示例代碼只是一個簡單的Actor模型的實現,實際使用時可能需要考慮更多的細節,如消息路由、Actor之間的連接關系、消息隊列的優化等。Actor模型是一種強大的并發編程模型,可以提供更好的抽象和隔離性來處理并發編程中的問題。
再添加消息路由功能
要添加消息路由功能,我們可以為ActorSystem類添加一個Router類成員變量,并在sendMessage方法中使用路由器來選擇目標Actor。下面是更新后的代碼:
class Router {
public:
virtual int route(const std::unordered_map<int, Actor*>& actors) = 0;
};
class RoundRobinRouter : public Router {
public:
int route(const std::unordered_map<int, Actor*>& actors) override {
if (actors.empty()) {
return -1;
}
currentActorIndex = (currentActorIndex + 1) % actors.size();
auto it = std::next(actors.begin(), currentActorIndex);
return it->first;
}
private:
int currentActorIndex = -1;
};
class ActorSystem {
public:
ActorSystem() {
std::thread thread([this]() {
this->run();
});
thread.detach();
}
template<typename T>
Actor& createActor() {
std::unique_lock<std::mutex> lock(mutex);
Actor* actor = new Actor();
actors.emplace(actorId, actor);
actorId++;
return *actor;
}
void sendMessage(std::function<void()> message, Router* router = nullptr) {
std::unique_lock<std::mutex> lock(mutex);
int actorId = -1;
if (router != nullptr) {
actorId = router->route(actors);
} else {
if (!actors.empty()) {
actorId = actors.begin()->first;
}
}
if (actorId != -1) {
actors[actorId]->send(message);
}
}
void stopActor(int actorId) {
std::unique_lock<std::mutex> lock(mutex);
if (actors.find(actorId) != actors.end()) {
actors[actorId]->stopActor();
delete actors[actorId];
actors.erase(actorId);
}
}
private:
void run() {
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(mutex);
for (const auto& actor : actors) {
actor.second->run();
}
}
}
std::unordered_map<int, Actor*> actors;
int actorId = 0;
std::mutex mutex;
};
在上述代碼中,我們添加了一個Router類和一個具體的RoundRobinRouter類。Router類是一個抽象類,其中定義了一個route方法,用于根據一組Actor選擇目標Actor。RoundRobinRouter類是一個具體的路由器實現,它使用輪詢的方式選擇目標Actor。
在ActorSystem類中,我們添加了一個可選的路由器參數router,并在sendMessage方法中使用路由器來選擇目標Actor。如果沒有指定路由器,則默認選擇第一個Actor。
你可以根據需要實現其他的路由器策略,比如隨機選擇、基于消息負載的選擇等。
使用示例代碼:
int main() {
ActorSystem actorSystem;
MyActor actor1(actorSystem);
MyActor actor2(actorSystem);
Actor& actor1Ref = actorSystem.createActor<MyActor>();
Actor& actor2Ref = actorSystem.createActor<MyActor>();
RoundRobinRouter router;
actorSystem.sendMessage([&actor1]() {
actor1.doSomething(1);
}, &router);
actorSystem.sendMessage([&actor2]() {
actor2.doSomething(2);
}, &router);
std::this_thread::sleep_for(std::chrono::seconds(5));
actorSystem.stopActor(0);
actorSystem.stopActor(1);
return 0;
}
在上述示例代碼中,我們創建了一個RoundRobinRouter對象,并將其作為參數傳遞給sendMessage方法。這樣,消息將根據輪詢策略路由到不同的Actor上。
請注意,上述示例代碼僅僅是一個簡單的消息路由實現,實際使用時可能需要考慮更多的細節,比如路由策略的靈活性、動態添加/刪除Actor時的路由更新等。
進一步優化路由策略的靈活性
要優化路由策略的靈活性,我們可以將路由策略從Router類中分離出來,并使用策略模式來實現。下面是更新后的代碼:
class RoutingStrategy {
public:
virtual int selectActor(const std::unordered_map<int, Actor*>& actors) = 0;
};
class Router {
public:
Router(RoutingStrategy* strategy) : strategy(strategy) {}
int route(const std::unordered_map<int, Actor*>& actors) {
return strategy->selectActor(actors);
}
private:
RoutingStrategy* strategy;
};
class RoundRobinStrategy : public RoutingStrategy {
public:
int selectActor(const std::unordered_map<int, Actor*>& actors) override {
if (actors.empty()) {
return -1;
}
currentActorIndex = (currentActorIndex + 1) % actors.size();
auto it = std::next(actors.begin(), currentActorIndex);
return it->first;
}
private:
int currentActorIndex = -1;
};
class RandomStrategy : public RoutingStrategy {
public:
int selectActor(const std::unordered_map<int, Actor*>& actors) override {
if (actors.empty()) {
return -1;
}
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, actors.size() - 1);
auto it = std::next(actors.begin(), dis(gen));
return it->first;
}
};
class ActorSystem {
public:
ActorSystem() {
std::thread thread([this]() {
this->run();
});
thread.detach();
}
template<typename T>
Actor& createActor() {
std::unique_lock<std::mutex> lock(mutex);
Actor* actor = new Actor();
actors.emplace(actorId, actor);
actorId++;
return *actor;
}
void sendMessage(std::function<void()> message, RoutingStrategy* strategy = nullptr) {
std::unique_lock<std::mutex> lock(mutex);
int actorId = -1;
if (strategy != nullptr) {
actorId = strategy->selectActor(actors);
} else {
if (!actors.empty()) {
actorId = actors.begin()->first;
}
}
if (actorId != -1) {
actors[actorId]->send(message);
}
}
void stopActor(int actorId) {
std::unique_lock<std::mutex> lock(mutex);
if (actors.find(actorId) != actors.end()) {
actors[actorId]->stopActor();
delete actors[actorId];
actors.erase(actorId);
}
}
private:
void run() {
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(mutex);
for (const auto& actor : actors) {
actor.second->run();
}
}
}
std::unordered_map<int, Actor*> actors;
int actorId = 0;
std::mutex mutex;
};
在上述代碼中,我們將路由策略從Router類中分離出來,并定義了一個RoutingStrategy抽象類。RoutingStrategy類中定義了一個selectActor方法,用于根據一組Actor選擇目標Actor。
然后,我們實現了兩個具體的路由策略類:RoundRobinStrategy和RandomStrategy。RoundRobinStrategy類使用輪詢的方式選擇目標Actor,RandomStrategy類使用隨機選擇的方式選擇目標Actor。
在Router類中,我們使用了策略模式,將具體的路由策略對象存儲在成員變量中,并在route方法中調用策略對象的selectActor方法。
在ActorSystem類中,我們添加了一個可選的策略參數strategy,并在sendMessage方法中使用策略對象來選擇目標Actor。如果沒有指定策略對象,則默認選擇第一個Actor。
使用示例代碼:
int main() {
ActorSystem actorSystem;
MyActor actor1(actorSystem);
MyActor actor2(actorSystem);
Actor& actor1Ref = actorSystem.createActor<MyActor>();
Actor& actor2Ref = actorSystem.createActor<MyActor>();
RoundRobinStrategy roundRobinStrategy;
RandomStrategy randomStrategy;
Router roundRobinRouter(&roundRobinStrategy);
Router randomRouter(&randomStrategy);
actorSystem.sendMessage([&actor1]() {
actor1.doSomething(1);
}, &roundRobinRouter);
actorSystem.sendMessage([&actor2]() {
actor2.doSomething(2);
}, &randomRouter);
std::this_thread::sleep_for(std::chrono::seconds(5));
actorSystem.stopActor(0);
actorSystem.stopActor(1);
return 0;
}
在上述示例代碼中,我們創建了一個RoundRobinStrategy對象和一個RandomStrategy對象,并將它們分別傳遞給RoundRobinRouter和RandomRouter對象。這樣,消息將根據不同的策略路由到不同的Actor上。
通過使用策略模式,我們可以很容易地添加新的路由策略,只需實現一個新的策略類,并將其傳遞給Router對象即可。這樣,我們可以根據需要靈活地選擇不同的路由策略。
動態添加/刪除Actor時的路由更新
當動態添加或刪除Actor時,需要更新路由策略,以確保新添加的Actor也能夠接收到消息。
對于動態添加Actor,我們可以在ActorSystem類中添加一個addActor方法,用于向actors容器中添加新的Actor。在添加完Actor之后,我們需要更新路由策略對象中的Actor列表。
對于動態刪除Actor,我們可以在ActorSystem類中添加一個removeActor方法,用于從actors容器中刪除指定的Actor。在刪除完Actor之后,我們同樣需要更新路由策略對象中的Actor列表。
下面是更新后的ActorSystem類的代碼:
class ActorSystem {
public:
ActorSystem() {
std::thread thread([this]() {
this->run();
});
thread.detach();
}
template<typename T>
Actor& createActor() {
std::unique_lock<std::mutex> lock(mutex);
Actor* actor = new Actor();
actors.emplace(actorId, actor);
actorId++;
// Update routing strategy
for (auto& strategy : routingStrategies) {
strategy->updateActors(actors);
}
return *actor;
}
void addActor(int actorId, Actor* actor) {
std::unique_lock<std::mutex> lock(mutex);
actors.emplace(actorId, actor);
// Update routing strategy
for (auto& strategy : routingStrategies) {
strategy->updateActors(actors);
}
}
void removeActor(int actorId) {
std::unique_lock<std::mutex> lock(mutex);
if (actors.find(actorId) != actors.end()) {
actors.erase(actorId);
// Update routing strategy
for (auto& strategy : routingStrategies) {
strategy->updateActors(actors);
}
}
}
void sendMessage(std::function<void()> message, RoutingStrategy* strategy = nullptr) {
std::unique_lock<std::mutex> lock(mutex);
int actorId = -1;
if (strategy != nullptr) {
actorId = strategy->selectActor(actors);
} else {
if (!actors.empty()) {
actorId = actors.begin()->first;
}
}
if (actorId != -1) {
actors[actorId]->send(message);
}
}
void stopActor(int actorId) {
std::unique_lock<std::mutex> lock(mutex);
if (actors.find(actorId) != actors.end()) {
actors[actorId]->stopActor();
delete actors[actorId];
actors.erase(actorId);
}
}
void addRoutingStrategy(RoutingStrategy* strategy) {
routingStrategies.push_back(strategy);
strategy->updateActors(actors);
}
private:
void run() {
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(mutex);
for (const auto& actor : actors) {
actor.second->run();
}
}
}
std::unordered_map<int, Actor*> actors;
std::vector<RoutingStrategy*> routingStrategies;
int actorId = 0;
std::mutex mutex;
};
在更新后的ActorSystem類中,我們添加了一個routingStrategies成員變量,用于存儲所有的路由策略對象。在添加或刪除Actor時,我們需要遍歷routingStrategies并調用每個策略對象的updateActors方法,以更新Actor列表。
此外,我們還添加了一個addRoutingStrategy方法,用于向routingStrategies中添加新的路由策略對象。在添加完路由策略對象之后,我們同樣需要調用updateActors方法來更新Actor列表。
使用示例代碼:
int main() {
ActorSystem actorSystem;
MyActor actor1(actorSystem);
MyActor actor2(actorSystem);
Actor& actor1Ref = actorSystem.createActor<MyActor>();
Actor& actor2Ref = actorSystem.createActor<MyActor>();
RoundRobinStrategy roundRobinStrategy;
RandomStrategy randomStrategy;
actorSystem.addRoutingStrategy(&roundRobinStrategy);
actorSystem.addRoutingStrategy(&randomStrategy);
actorSystem.sendMessage([&actor1]() {
actor1.doSomething(1);
}, &roundRobinStrategy);
actorSystem.sendMessage([&actor2]() {
actor2.doSomething(2);
}, &randomStrategy);
std::this_thread::sleep_for(std::chrono::seconds(5));
actorSystem.stopActor(0);
actorSystem.stopActor(1);
return 0;
}
在上述示例代碼中,我們首先創建了兩個MyActor對象,并將它們添加到ActorSystem中。然后,我們創建了一個RoundRobinStrategy對象和一個RandomStrategy對象,并將它們分別添加到ActorSystem中。
接下來,我們使用sendMessage方法向不同的路由策略發送消息。在發送消息之前,我們需要將路由策略對象傳遞給sendMessage方法。
最后,我們停止了兩個Actor,并結束了程序。在停止Actor之后,我們需要調用stopActor方法,并在ActorSystem中刪除相應的Actor。
完整代碼
#include <iostream>
#include <thread>
#include <chrono>
#include <unordered_map>
#include <functional>
#include <mutex>
class Actor {
public:
Actor() {}
void send(std::function<void()> message) {
messages.push_back(message);
}
void run() {
std::unique_lock<std::mutex> lock(mutex);
for (const auto& message : messages) {
message();
}
messages.clear();
}
void stopActor() {
std::unique_lock<std::mutex> lock(mutex);
stopped = true;
}
private:
std::vector<std::function<void()>> messages;
std::mutex mutex;
bool stopped = false;
};
class RoutingStrategy {
public:
virtual int selectActor(const std::unordered_map<int, Actor*>& actors) = 0;
virtual void updateActors(const std::unordered_map<int, Actor*>& actors) = 0;
};
class RoundRobinStrategy : public RoutingStrategy {
public:
int selectActor(const std::unordered_map<int, Actor*>& actors) override {
if (actors.empty()) {
return -1;
}
if (currentActor >= actors.size()) {
currentActor = 0;
}
auto it = actors.begin();
std::advance(it, currentActor);
currentActor++;
return it->first;
}
void updateActors(const std::unordered_map<int, Actor*>& actors) override {
this->actors = actors;
}
private:
std::unordered_map<int, Actor*> actors;
int currentActor = 0;
};
class RandomStrategy : public RoutingStrategy {
public:
int selectActor(const std::unordered_map<int, Actor*>& actors) override {
if (actors.empty()) {
return -1;
}
int randomIndex = rand() % actors.size();
auto it = actors.begin();
std::advance(it, randomIndex);
return it->first;
}
void updateActors(const std::unordered_map<int, Actor*>& actors) override {
this->actors = actors;
}
private:
std::unordered_map<int, Actor*> actors;
};
class ActorSystem {
public:
template<typename ActorType>
ActorType& createActor() {
std::unique_lock<std::mutex> lock(mutex);
ActorType* actor = new ActorType(*this);
actors.emplace(actorId, actor);
actorId++;
// Update routing strategy
for (auto& strategy : routingStrategies) {
strategy->updateActors(actors);
}
return *actor;
}
void addActor(int actorId, Actor* actor) {
std::unique_lock<std::mutex> lock(mutex);
actors.emplace(actorId, actor);
// Update routing strategy
for (auto& strategy : routingStrategies) {
strategy->updateActors(actors);
}
}
void removeActor(int actorId) {
std::unique_lock<std::mutex> lock(mutex);
if (actors.find(actorId) != actors.end()) {
actors.erase(actorId);
// Update routing strategy
for (auto& strategy : routingStrategies) {
strategy->updateActors(actors);
}
}
}
void sendMessage(std::function<void()> message, RoutingStrategy* strategy = nullptr) {
std::unique_lock<std::mutex> lock(mutex);
int actorId = -1;
if (strategy != nullptr) {
actorId = strategy->selectActor(actors);
} else {
if (!actors.empty()) {
actorId = actors.begin()->first;
}
}
if (actorId != -1) {
actors[actorId]->send(message);
}
}
void stopActor(int actorId) {
std::unique_lock<std::mutex> lock(mutex);
if (actors.find(actorId) != actors.end()) {
actors[actorId]->stopActor();
delete actors[actorId];
actors.erase(actorId);
}
}
void addRoutingStrategy(RoutingStrategy* strategy) {
routingStrategies.push_back(strategy);
strategy->updateActors(actors);
}
private:
void run() {
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(mutex);
for (const auto& actor : actors) {
actor.second->run();
}
}
}
std::unordered_map<int, Actor*> actors;
std::vector<RoutingStrategy*> routingStrategies;
int actorId = 0;
std::mutex mutex;
};
class MyActor {
public:
MyActor(ActorSystem& actorSystem) : actorSystem(actorSystem) {
actorSystem.addActor(actorId, this);
}
void doSomething(int value) {
std::cout << "Actor " << actorId << " is doing something with value " << value << std::endl;
}
private:
int actorId;
ActorSystem& actorSystem;
};
int main() {
ActorSystem actorSystem;
MyActor actor1(actorSystem);
MyActor actor2(actorSystem);
Actor& actor1Ref = actorSystem.createActor<MyActor>();
Actor& actor2Ref = actorSystem.createActor<MyActor>();
RoundRobinStrategy roundRobinStrategy;
RandomStrategy randomStrategy;
actorSystem.addRoutingStrategy(&roundRobinStrategy);
actorSystem.addRoutingStrategy(&randomStrategy);
actorSystem.sendMessage([&actor1]() {
actor1.doSomething(1);
}, &roundRobinStrategy);
actorSystem.sendMessage([&actor2]() {
actor2.doSomething(2);
}, &randomStrategy);
std::this_thread::sleep_for(std::chrono::seconds(5));
actorSystem.stopActor(0);
actorSystem.stopActor(1);
return 0;
}