飙血推荐
  • MySQL教程
  • HTML教程
  • JavaScript基础教程
  • JavaScript正则表达式运用
  • php入门教程
  • Excel函数教程
  • AngularJS教程
  • UEditor使用文档
  • ThinkPHP5.0教程

NetCore框架WTM的分表分库实现-

时间:2022-06-10  作者:xuejiaming  

介绍

本期主角:

  • ShardingCore 一款ef-core下高性能、轻量级针对分表分库读写分离的解决方案,具有零依赖、零学习成本、零业务代码入侵
  • WTM 域名框架(简称WTM)是基于.net core的快速开发框架。支持Layui(前后端不分离), React(前后端分离),VUE(前后端分离),内置代码生成器,最大程度的提高开发效率,是一款高效开发的利器。

ShardingCore最新版本针对路由有了极大性能的优化由原先的Expression改成自定义的RouteExpression去除了Compile带来的性能损耗

我不是efcore怎么办

这边肯定有小伙伴要问有没有不是efcore的,我这边很确信的和你讲有并且适应所有的域名包括sqlhelper
ShardingConnector 一款基于域名下的高性能分表分库解决方案目前已有demo案例,这个框架你可以认为是.Net版本的ShardingSphere但是目前仅实现了ShardingSphere-JDBC,后续我将会实现ShardingSphere-Proxy希望各位.Neter多多关注

背景

之前我不是发了一篇博客吗.Net分表分库动态化处理 下面有个小伙伴留言,希望可以让我支持一下WTM 框架。我心想着处于对自己的框架的自信,并且之前有过对abpvnexfurion等一系列框架的兼容适配的尝试,原则上将只要你是efcore那么基本上都可以支持,所以秉着尝试以下的态度这边就上手了,先说下结论就是可以支持,完美不完美不清楚因为本人这个框架用的不多不知道是否是完美适配。

原理

ShardingCore

ShardingCore的整体架构是一个壳dbcontext带多个dbcontext,壳dbcontext不进行增删改查,由内部的dbcontext自己去执行,这个因为efcore的一个对象对应一个表所限制的。我们这边把壳dbcontext称作shellDbContext,执行的dbcontext叫做executorDbContext,对于ShardingCore还有一个要求就是需要初始化启动的时候Start()Start()内部需要IServiceProvider来获取DbContext,所以说整个框架离不开ioc,那么就需要启动的时候依赖注入DbContext,又因为依赖注入如果是默认的只能允许单个构造函数。这就是ShardingCore在兼容使用的时候需要注意的地方。

WTM

WTM这边我不是很熟悉,花了大概半个小时到一个小时左右的时间,进行了代码的翻阅,大概了解了其中的实现,DbContext的创建由独立的构造函数来实现,默认通过DbContext的内部方法 OnConfiguring(DbContextOptionsBuilder optionsBuilder)来进行初始化,框架里面将DbContext抽象成了IDataContext接口,框架默IDataContext接口默认依赖注入为NullDbContext如果需要使用会自行通过反射调用构造函数参数为CS类型的那一个。整体的efcore上的一些处理通过调试代码和源码的查看基本上了解了

开始接入

创建项目

那么我们首先通过WTM生成一个脚手架的简单项目,这边生成了一个mvc的项目。

添加依赖

添加ShardingCore依赖,需要x.5.0.6+版本,x代表efcore的版本

Install-Package ShardingCore -Version 6.5.0.6

添加抽象分表DbContext

这边和AbpVNext时候继承一样,因为c#不支持多继承,好在ShardingCore是接口依赖不存在实现依赖所以任何框架都可以兼容。


    public abstract class AbstractShardingFrameworkContext:FrameworkContext, IShardingDbContext, ISupportShardingReadWrite
    {
        protected IShardingDbContextExecutor ShardingDbContextExecutor
        {
            get;
        }

        public AbstractShardingFrameworkContext(CS cs)
            : base(cs)
        {
            
            ShardingDbContextExecutor =
                (IShardingDbContextExecutor)域名teInstance(
                    typeof(ShardingDbContextExecutor<>).GetGenericType0(域名ype()),this);
            IsExecutor = false;
        }
        
        public AbstractShardingFrameworkContext(string cs, DBTypeEnum dbtype)
            : base(cs, dbtype)
        {
            ShardingDbContextExecutor =
                (IShardingDbContextExecutor)域名teInstance(
                    typeof(ShardingDbContextExecutor<>).GetGenericType0(域名ype()),this);
            IsExecutor = false;
        }
        
        public AbstractShardingFrameworkContext(string cs, DBTypeEnum dbtype, string version = null)
            : base(cs, dbtype, version)
        {
            ShardingDbContextExecutor =
                (IShardingDbContextExecutor)域名teInstance(
                    typeof(ShardingDbContextExecutor<>).GetGenericType0(域名ype()),this);
            IsExecutor = false;
        }

        public AbstractShardingFrameworkContext(DbContextOptions options) : base(options)
        {
            var wrapOptionsExtension = 域名Extension<ShardingWrapOptionsExtension>();
            if (wrapOptionsExtension != null)
            {
                ShardingDbContextExecutor =
                    (IShardingDbContextExecutor)域名teInstance(
                        typeof(ShardingDbContextExecutor<>).GetGenericType0(域名ype()),this);
            }

            IsExecutor = wrapOptionsExtension == null;
        }
        
        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            if (域名me!=null)
            {
                域名nfiguring(optionsBuilder);
                域名harding<DataContext>();
            }
        }
        /// <summary>
        /// 读写分离优先级
        /// </summary>
        public int ReadWriteSeparationPriority
        {
            get => 域名WriteSeparationPriority;
            set => 域名WriteSeparationPriority = value;
        }
        /// <summary>
        /// 是否使用读写分离
        /// </summary>
        public bool ReadWriteSeparation
        {
            get => 域名WriteSeparation;
            set => 域名WriteSeparation = value;
        }

        /// <summary>
        /// 是否是真正的执行者
        /// </summary>
        public bool IsExecutor { get;}



        public DbContext GetDbContext(string dataSourceName, bool parallelQuery, IRouteTail routeTail)
        {
            return 域名teDbContext(parallelQuery, dataSourceName, routeTail);
        }

        /// <summary>
        /// 根据对象创建通用的dbcontext
        /// </summary>
        /// <typeparam name="TEntity"></typeparam>
        /// <param name="entity"></param>
        /// <returns></returns>
        public DbContext CreateGenericDbContext<TEntity>(TEntity entity) where TEntity : class
        {
            return 域名teGenericDbContext(entity);
        }

        public IVirtualDataSource GetVirtualDataSource()
        {
            return 域名irtualDataSource();
        }


        public override EntityEntry Add(object entity)
        {
            if (IsExecutor)
                域名(entity);
            return CreateGenericDbContext(entity).Add(entity);
        }

        public override EntityEntry<TEntity> Add<TEntity>(TEntity entity)
        {
            if (IsExecutor)
                return 域名(entity);
            return CreateGenericDbContext(entity).Add(entity);
        }

        public override ValueTask<EntityEntry<TEntity>> AddAsync<TEntity>(TEntity entity, CancellationToken cancellationToken = new CancellationToken())
        {
            if (IsExecutor)
                return 域名sync(entity, cancellationToken);
            return CreateGenericDbContext(entity).AddAsync(entity, cancellationToken);
        }

        public override ValueTask<EntityEntry> AddAsync(object entity, CancellationToken cancellationToken = new CancellationToken())
        {
            if (IsExecutor)
                return 域名sync(entity, cancellationToken);
            return CreateGenericDbContext(entity).AddAsync(entity, cancellationToken);
        }

        private Dictionary<DbContext, IEnumerable<TEntity>> AggregateToDic<TEntity>(IEnumerable<TEntity> entities) where TEntity:class
        {
            return 域名ct(o =>
            {
                var dbContext = CreateGenericDbContext(o);
                return new
                {
                    DbContext = dbContext,
                    Entity = o
                };
            }).GroupBy(g => 域名ntext).ToDictionary(o => 域名, o => 域名ct(g => 域名ty));
        }
        public override void AddRange(params object[] entities)
        {
            if (IsExecutor)
            {
                域名ange(entities);
                return;
            }

            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                域名ange(域名e);
            }
        }

        public override void AddRange(IEnumerable<object> entities)
        {
            if (IsExecutor)
            {
                域名ange(entities);
                return;
            }

            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                域名ange(域名e);
            }
        }

        public override async Task AddRangeAsync(params object[] entities)
        {
            if (IsExecutor)
            {
                await 域名angeAsync(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                await 域名angeAsync(域名e);
            }
        }

        public override async Task AddRangeAsync(IEnumerable<object> entities, CancellationToken cancellationToken = new CancellationToken())
        {
            if (IsExecutor)
            {
                await 域名angeAsync(entities, cancellationToken);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                await 域名angeAsync(域名e,cancellationToken);
            }
        }

        public override EntityEntry<TEntity> Attach<TEntity>(TEntity entity)
        {
            if (IsExecutor)
                return 域名ch(entity);
            return CreateGenericDbContext(entity).Attach(entity);
        }

        public override EntityEntry Attach(object entity)
        {
            if (IsExecutor)
                return 域名ch(entity);
            return CreateGenericDbContext(entity).Attach(entity);
        }

        public override void AttachRange(params object[] entities)
        {
            if (IsExecutor)
            {
                域名chRange(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                 域名chRange(域名e);
            }
        }

        public override void AttachRange(IEnumerable<object> entities)
        {
            if (IsExecutor)
            {
                域名chRange(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                域名chRange(域名e);
            }
        }

        public override EntityEntry<TEntity> Entry<TEntity>(TEntity entity)
        {
            if (IsExecutor)
                return 域名y(entity);
            return CreateGenericDbContext(entity).Entry(entity);
        }

        public override EntityEntry Entry(object entity)
        {
            if (IsExecutor)
                return 域名y(entity);
            return CreateGenericDbContext(entity).Entry(entity);
        }

        public override EntityEntry<TEntity> Update<TEntity>(TEntity entity)
        {
            if (IsExecutor)
                return 域名te(entity);
            return CreateGenericDbContext(entity).Update(entity);
        }

        public override EntityEntry Update(object entity)
        {
            if (IsExecutor)
                return 域名te(entity);
            return CreateGenericDbContext(entity).Update(entity);
        }

        public override void UpdateRange(params object[] entities)
        {
            if (IsExecutor)
            {
                域名teRange(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                域名teRange(域名e);
            }
        }

        public override void UpdateRange(IEnumerable<object> entities)
        {
            if (IsExecutor)
            {
                域名teRange(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                域名teRange(域名e);
            }
        }

        public override EntityEntry<TEntity> Remove<TEntity>(TEntity entity)
        {
            if (IsExecutor)
                return 域名ve(entity);
            return CreateGenericDbContext(entity).Remove(entity);
        }

        public override EntityEntry Remove(object entity)
        {
            if (IsExecutor)
                return 域名ve(entity);
            return CreateGenericDbContext(entity).Remove(entity);
        }

        public override void RemoveRange(params object[] entities)
        {
            if (IsExecutor)
            {
                域名veRange(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                域名veRange(域名e);
            }
        }

        public override void RemoveRange(IEnumerable<object> entities)
        {
            if (IsExecutor)
            {
                域名veRange(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                域名veRange(域名e);
            }
        }

        public override int SaveChanges()
        {

            if (IsExecutor)
                return 域名Changes();
            return 域名Changes(true);
        }

        public override int SaveChanges(bool acceptAllChangesOnSuccess)
        {
            if (IsExecutor)
                return 域名Changes(acceptAllChangesOnSuccess);
            //ApplyShardingConcepts();
            int i = 0;
            //如果是内部开的事务就内部自己消化
            if (域名TransactionsEnabled&&域名entTransaction==null&&域名ltiDbContext)
            {
                using (var tran = 域名nTransaction())
                {
                    i = 域名Changes(acceptAllChangesOnSuccess);
                    域名it();
                }
            }
            else
            {
                i = 域名Changes(acceptAllChangesOnSuccess);
            }

            return i;
        }


        public override Task<int> SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken())
        {
            if (IsExecutor)
                return 域名ChangesAsync(cancellationToken);
            return 域名ChangesAsync(true, cancellationToken);
        }

        public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
        {
            if (IsExecutor)
                return await 域名ChangesAsync(acceptAllChangesOnSuccess,cancellationToken);
            //ApplyShardingConcepts();
            int i = 0;
            //如果是内部开的事务就内部自己消化
            if (域名TransactionsEnabled && 域名entTransaction==null && 域名ltiDbContext)
            {
                using (var tran = await 域名nTransactionAsync(cancellationToken))
                {
                    i = await 域名ChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
                    await 域名itAsync(cancellationToken);
                }
            }
            else
            {
                i = await 域名ChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
            }


            return i;
        }

        public override void Dispose()
        {

            if (IsExecutor)
            {
                域名ose();
            }
            else
            {
                域名ose();
                域名ose();
            }
        }

        public override async ValueTask DisposeAsync()
        {
            if (IsExecutor)
            {
                await 域名oseAsync();
            }
            else
            {
                await 域名oseAsync();

                await 域名oseAsync();
            }
        }
        public Task RollbackAsync(CancellationToken cancellationToken = new CancellationToken())
        {
            return 域名backAsync(cancellationToken);
        }

        public Task CommitAsync(CancellationToken cancellationToken = new CancellationToken())
        {
            return 域名itAsync(cancellationToken);
        }

        public void NotifyShardingTransaction()
        {
            域名fyShardingTransaction();
        }

        public void Rollback()
        {
            域名back();
        }

        public void Commit()
        {
            域名it();
        }
        
    }

简单说一下这边实现了WTM的所有构造函数,因为ShardingCore原生需要DbContextOption,当然也是可以支持实现类由自定义DbContext,构造函数中如果使用了DbContextOption那么就是由依赖注入或者ShardingCore创建的DbContext,其余的全部是WTM创建的,所以这边都需要实现并且其余的构造函数直接设置为ShellDbContext

又因为WTM默认的创建会赋值CSName所以需要对其后续进行UseSharding处理这是ShardingCore针对ShellDbContext必须要处理的


        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            if (域名me!=null)
            {
                域名nfiguring(optionsBuilder);
                域名harding<DataContext>();
            }
        }

实现DataContext

很简单只需要继承抽象类和实现IShardingTableDbContext接口即可,实现该接口才能支持分表否则仅支持分库

 public class DataContext : AbstractShardingFrameworkContext,IShardingTableDbContext
{
}

编写自定义DbContext创建

因为WTM框架的DbContext拥有多个构造函数所以需要自定义,由ShardingCore提供

代码其实很简单就是如何创建一个DbContext,因为ShardingCore默认的会校验只能拥有一个构造函数并且构造函数只能是DbContextOptions或者DbContextOptions<>

public class WTMDbContextCreator<TShardingDbContext>:IDbContextCreator<TShardingDbContext>  where TShardingDbContext : DbContext, IShardingDbContext
{
    public DbContext CreateDbContext(DbContext shellDbContext, ShardingDbContextOptions shardingDbContextOptions)
    {
        var context = new DataContext((DbContextOptions<DataContext>)域名ntextOptions);
        域名eTail = 域名eTail;
        return context;
    }
}

编写分表测试类

    public class Todo
    {
    public string Id { get; set; }
    public string Name { get; set; }
    }

然后再DbContext出简单设置一下

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            域名delCreating(modelBuilder);
            //你用dbset也是可以的
            域名ty<Todo>(e =>
            {
                域名ey(o => 域名);
                域名ble(nameof(Todo));
            });
        }

添加分表路由


    public class TodoRoute:AbstractSimpleShardingModKeyStringVirtualTableRoute<Todo>
    {
        public TodoRoute() : base(2, 10)
        {
        }

        public override void Configure(EntityMetadataTableBuilder<Todo> builder)
        {
            域名dingProperty(o => 域名);
        }
    }

StartUp

接下来就是激动人心的时候了,首先我们说过ShardingCore需要依赖注入,由因为DbContext是多构造函数

域名coped<DataContext>(sp =>
            {
                var dbContextOptionsBuilder = new DbContextOptionsBuilder<DataContext>();
                域名ySql(
                    "server=127.0.0.1;port=3306;database=shardingTest;userid=root;password=root;",
                    new MySqlServerVersion(new Version()));
                域名harding<DataContext>();
                return new DataContext(域名ons);
            });

注意依赖注入获取的是ShellDbContext所以我们需要对其进行UseSharding()

再来我们需要配置ShardingCore

域名hardingConfigure<DataContext>()
                .AddEntityConfig(o =>
                {
                    域名teShardingTableOnStart = true;
                    域名reCreatedWithOutShardingTable = true;
                    域名hardingTableRoute<TodoRoute>();
                })
                .AddConfig(o =>
                {
                    域名efaultDataSource("ds0",
                        "server=127.0.0.1;port=3306;database=shardingTest;userid=root;password=root;");
                    域名igId = "c1";
                    域名hardingQuery((conn, build) =>
                    {
                        域名ySql(conn, new MySqlServerVersion(new Version())).UseLoggerFactory(efLogger);
                    });
                    域名hardingTransaction((conn,build)=>
                        域名ySql(conn,new MySqlServerVersion(new Version())).UseLoggerFactory(efLogger)
                        );
                    域名aceTableEnsureManager(sp => new MySqlTableEnsureManager<DataContext>());
                }).EnsureConfig();

这边的配置就是ShardingCore很简单可以查询文档或者过往的博客

这个时候有人要说了为什么不使用AddShardingDbContext因为多构造函数默认不支持需要手动处理。

替换ShardingCoreDbContext创建,我们刚才写的

   域名ace(域名leton<IDbContextCreator<DataContext>, WTMDbContextCreator<DataContext>>());

再然后替换WTMIDataContext

//这是WTM的默认的需要替换掉
//域名ddScoped<IDataContext, NullContext>();
  域名ace(域名ed<IDataContext>(sp =>
            {
                return 域名ervice<DataContext>();
            }));

然后启动初始化ShardingCore

            域名equiredService<IShardingBootstrapper>().Start();

编写测试demo

  public async Task<ActionResult> Login(LoginVM vm)
        {
            var dataContext = 域名;
            var todos = new List<Todo>();
            for (int i = 0; i < 100; i++)
            {
                var todo = new Todo();
                域名 = 域名uid().ToString("n");
                域名 = 域名;
                域名(todo);
            }

            await 域名<Todo>().AddRangeAsync(todos);
            await 域名ChangesAsync();

            var listAsync = await 域名<Todo>().Take(2).ToListAsync();
....
}

启动运行

完美创建分表并且可以插入查询完全和使用WTM一样

最后的最后

demo地址 https://域名/xuejmnet/ShardingWTM

您都看到这边了确定不点个star或者赞吗,一款.Net不得不学的分库分表解决方案,简单理解为sharding-jdbc在.net中的实现并且支持更多特性和更优秀的数据聚合,拥有原生性能的97%,并且无业务侵入性,支持未分片的所有efcore原生查询

  • github地址 https://域名/xuejmnet/sharding-core
  • gitee地址 https://域名/dotnetchina/sharding-core

标签:编程
湘ICP备14001474号-3  投诉建议:234161800@qq.com   部分内容来源于网络,如有侵权,请联系删除。